Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job watcher #442

Merged
merged 6 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .buildkite/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ rules:
- pods/eviction
verbs:
- create
- apiGroups:
- ""
resources:
- events
verbs:
- list
---
apiVersion: v1
kind: ServiceAccount
Expand Down
6 changes: 6 additions & 0 deletions charts/agent-stack-k8s/templates/rbac.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ rules:
- pods/eviction
verbs:
- create
- apiGroups:
- ""
resources:
- events
verbs:
- list
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down
15 changes: 15 additions & 0 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ func AddConfigFlags(cmd *cobra.Command) {
)
cmd.Flags().String("graphql-endpoint", "", "Buildkite GraphQL endpoint URL")

cmd.Flags().Duration(
"stale-job-data-timeout",
config.DefaultStaleJobDataTimeout,
"Duration after querying jobs in Buildkite that the data is considered valid",
)
cmd.Flags().Int(
"job-creation-concurrency",
config.DefaultJobCreationConcurrency,
"Number of concurrent goroutines to run for converting Buildkite jobs into Kubernetes jobs",
)
cmd.Flags().Duration(
"image-pull-backoff-grace-period",
config.DefaultImagePullBackOffGracePeriod,
Expand All @@ -100,6 +110,11 @@ func AddConfigFlags(cmd *cobra.Command) {
config.DefaultJobCancelCheckerPollInterval,
"Controls the interval between job state queries while a pod is still Pending",
)
cmd.Flags().Duration(
"empty-job-grace-period",
config.DefaultEmptyJobGracePeriod,
"Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account)",
)
cmd.Flags().Bool(
"prohibit-kubernetes-plugin",
false,
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestReadAndParseConfig(t *testing.T) {
JobTTL: 300 * time.Second,
ImagePullBackOffGracePeriod: 60 * time.Second,
JobCancelCheckerPollInterval: 10 * time.Second,
EmptyJobGracePeriod: 50 * time.Second,
PollInterval: 5 * time.Second,
StaleJobDataTimeout: 10 * time.Second,
JobCreationConcurrency: 5,
Expand Down
1 change: 1 addition & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ image: my.registry.dev/buildkite-agent:latest
job-ttl: 5m
image-pull-backoff-grace-period: 60s
job-cancel-checker-poll-interval: 10s
empty-job-grace-period: 50s
poll-interval: 5s
stale-job-data-timeout: 10s
job-creation-concurrency: 5
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ const (
BuildURLAnnotation = "buildkite.com/build-url"
JobURLAnnotation = "buildkite.com/job-url"
DefaultNamespace = "default"
DefaultStaleJobDataTimeout = 10 * time.Second
DefaultImagePullBackOffGracePeriod = 30 * time.Second
DefaultJobCancelCheckerPollInterval = 5 * time.Second
DefaultEmptyJobGracePeriod = 30 * time.Second
DefaultJobCreationConcurrency = 5
)

var DefaultAgentImage = "ghcr.io/buildkite/agent:" + version.Version()
Expand Down Expand Up @@ -49,6 +52,7 @@ type Config struct {
PodSpecPatch *corev1.PodSpec `json:"pod-spec-patch" validate:"omitempty"`
ImagePullBackOffGracePeriod time.Duration `json:"image-pull-backoff-grace-period" validate:"omitempty"`
JobCancelCheckerPollInterval time.Duration `json:"job-cancel-checker-poll-interval" validate:"omitempty"`
EmptyJobGracePeriod time.Duration `json:"empty-job-grace-period" validate:"omitempty"`

// WorkspaceVolume allows supplying a volume for /workspace. By default
// an EmptyDir volume is created for it.
Expand Down
13 changes: 13 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,20 @@ func Run(
logger.Fatal("failed to register completions informer", zap.Error(err))
}

// JobWatcher watches for jobs in bad conditions to clean up:
// * Jobs that fail without ever creating a pod
// * Jobs that stall forever without ever creating a pod
jobWatcher := scheduler.NewJobWatcher(
logger.Named("jobWatcher"),
k8sClient,
cfg,
)
if err := jobWatcher.RegisterInformer(ctx, informerFactory); err != nil {
logger.Fatal("failed to register jobWatcher informer", zap.Error(err))
}

// PodWatcher watches for other conditions to clean up pods:
// * Pods where an init container failed for any reason
// * Pods where a container is in ImagePullBackOff for too long
// * Pods that are still pending, but the Buildkite job has been cancelled
podWatcher := scheduler.NewPodWatcher(
Expand Down
5 changes: 3 additions & 2 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Khan/genqlient/graphql"
"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/model"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -46,12 +47,12 @@ func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, er

// Default StaleJobDataTimeout to 10s.
if cfg.StaleJobDataTimeout <= 0 {
cfg.StaleJobDataTimeout = 10 * time.Second
cfg.StaleJobDataTimeout = config.DefaultStaleJobDataTimeout
}

// Default CreationConcurrency to 5.
if cfg.JobCreationConcurrency <= 0 {
cfg.JobCreationConcurrency = 5
cfg.JobCreationConcurrency = config.DefaultJobCreationConcurrency
}

return &Monitor{
Expand Down
23 changes: 15 additions & 8 deletions internal/controller/scheduler/completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"

"go.uber.org/zap"

v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,9 +17,18 @@ import (
"k8s.io/utils/ptr"
)

const defaultTermGracePeriodSeconds = 60

type completionsWatcher struct {
logger *zap.Logger
k8s kubernetes.Interface

// This is the context passed to RegisterInformer.
// It's being stored here (grrrr!) because the k8s ResourceEventHandler
// interface doesn't have context args. (Working around an interface in a
// library outside of our control is a carve-out from the usual rule.)
// The context is needed to ensure goroutines are cleaned up.
resourceEventHandlerCtx context.Context
}

func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *completionsWatcher {
Expand All @@ -30,14 +40,12 @@ func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *comp
}

// Creates a Pods informer and registers the handler on it
func (w *completionsWatcher) RegisterInformer(
ctx context.Context,
factory informers.SharedInformerFactory,
) error {
func (w *completionsWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
informer := factory.Core().V1().Pods().Informer()
if _, err := informer.AddEventHandler(w); err != nil {
return err
}
w.resourceEventHandlerCtx = ctx // see note on field
go factory.Start(ctx.Done())
return nil
}
Expand All @@ -49,7 +57,7 @@ func (w *completionsWatcher) OnDelete(obj any) {}
func (w *completionsWatcher) OnAdd(obj any, isInInitialList bool) {
completionWatcherOnAddEventCounter.Inc()
pod := obj.(*v1.Pod)
w.cleanupSidecars(pod)
w.cleanupSidecars(w.resourceEventHandlerCtx, pod)
}

func (w *completionsWatcher) OnUpdate(old any, new any) {
Expand All @@ -62,15 +70,15 @@ func (w *completionsWatcher) OnUpdate(old any, new any) {
}

newPod := new.(*v1.Pod)
w.cleanupSidecars(newPod)
w.cleanupSidecars(w.resourceEventHandlerCtx, newPod)
}

// cleanupSidecars first checks if the container status of the agent container
// in the pod is Terminated. If so, it ensures the job is cleaned up by updating
// it with an ActiveDeadlineSeconds value (defaultTermGracePeriodSeconds).
// (So this is not actually sidecar-specific, but is needed because sidecars
// would otherwise cause the pod to continue running.)
func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) {
func (w *completionsWatcher) cleanupSidecars(ctx context.Context, pod *v1.Pod) {
terminated := getTermination(pod)
if terminated == nil {
return
Expand All @@ -82,7 +90,6 @@ func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) {
)

if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
ctx := context.TODO()
job, err := w.k8s.BatchV1().Jobs(pod.Namespace).Get(ctx, pod.Labels["job-name"], metav1.GetOptions{})
if err != nil {
return err
Expand Down
39 changes: 37 additions & 2 deletions internal/controller/scheduler/fail_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"os"

"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/version"

agentcore "github.com/buildkite/agent/v3/core"
Expand All @@ -16,9 +18,42 @@ import (
"k8s.io/client-go/kubernetes"
)

// failJob fails the job in Buildkite. agentToken needs to be the token value.
// acquireAndFailForObject figures out how to fail the BK job corresponding to
// the k8s object (a pod or job) by inspecting the object's labels.
func acquireAndFailForObject(
ctx context.Context,
logger *zap.Logger,
k8sClient kubernetes.Interface,
cfg *config.Config,
obj metav1.Object,
message string,
) error {
agentToken, err := fetchAgentToken(ctx, logger, k8sClient, obj.GetNamespace(), cfg.AgentTokenSecret)
if err != nil {
logger.Error("fetching agent token from secret", zap.Error(err))
return err
}

// Matching tags are required order to connect the temporary agent.
labels := obj.GetLabels()
jobUUID := labels[config.UUIDLabel]
if jobUUID == "" {
logger.Error("object missing UUID label", zap.String("label", config.UUIDLabel))
return errors.New("missing UUID label")
}
tags := agenttags.TagsFromLabels(labels)
opts := cfg.AgentConfig.ControllerOptions()

if err := acquireAndFail(ctx, logger, agentToken, jobUUID, tags, message, opts...); err != nil {
logger.Error("failed to acquire and fail the job on Buildkite", zap.Error(err))
return err
}
return nil
}

// acquireAndFail fails the job in Buildkite. agentToken needs to be the token value.
// Use fetchAgentToken to fetch it from the k8s secret.
func failJob(
func acquireAndFail(
ctx context.Context,
zapLogger *zap.Logger,
agentToken string,
Expand Down
Loading