Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul of MaxInFlightLimiter #370

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/go-playground/locales v0.14.1
github.com/go-playground/universal-translator v0.18.1
github.com/go-playground/validator/v10 v10.22.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
261 changes: 183 additions & 78 deletions internal/controller/scheduler/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,60 @@ import (
"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/monitor"

"github.com/google/uuid"
"go.uber.org/zap"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

// MaxInFlightLimiter is a job handler that wraps another job handler
// (typically the actual job scheduler) and only creates new jobs if the total
// number of jobs currently running is below a limit.
type MaxInFlightLimiter struct {
scheduler monitor.JobHandler
// MaxInFlight sets the upper limit on number of jobs running concurrently
// in the cluster. 0 means no limit.
MaxInFlight int

logger *zap.Logger
mu sync.RWMutex
inFlight map[string]struct{}
completions *sync.Cond
// scheduler is the thing that actually schedules jobs in the cluster.
scheduler monitor.JobHandler

// Logs go here
logger *zap.Logger

// Map to track in-flight jobs, and mutex to protect it.
inFlightMu sync.Mutex
inFlight map[uuid.UUID]bool

// When a job starts, it takes a token from the bucket.
// When a job ends, it puts a token back in the bucket.
tokenBucket chan struct{}
}

func NewLimiter(
logger *zap.Logger,
scheduler monitor.JobHandler,
maxInFlight int,
) *MaxInFlightLimiter {
// NewLimiter creates a MaxInFlightLimiter.
func NewLimiter(logger *zap.Logger, scheduler monitor.JobHandler, maxInFlight int) *MaxInFlightLimiter {
l := &MaxInFlightLimiter{
scheduler: scheduler,
MaxInFlight: maxInFlight,
logger: logger,
inFlight: make(map[string]struct{}),
inFlight: make(map[uuid.UUID]bool),
tokenBucket: make(chan struct{}, maxInFlight),
}
if maxInFlight <= 0 { // infinite capacity
// All receives from a closed channel succeed immediately.
close(l.tokenBucket)
}
for range maxInFlight { // finite capacity
// Fill the bucket with tokens.
l.tokenBucket <- struct{}{}
}
l.completions = sync.NewCond(&l.mu)
return l
}

// Creates a Jobs informer, registers the handler on it, and waits for cache sync
func (l *MaxInFlightLimiter) RegisterInformer(
ctx context.Context,
factory informers.SharedInformerFactory,
) error {
// RegisterInformer registers the limiter to listen for Kubernetes job events,
// and waits for cache sync.
func (l *MaxInFlightLimiter) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
informer := factory.Batch().V1().Jobs()
jobInformer := informer.Informer()
if _, err := jobInformer.AddEventHandler(l); err != nil {
Expand All @@ -58,104 +76,191 @@ func (l *MaxInFlightLimiter) RegisterInformer(
return nil
}

// Create either creates the job immediately, or blocks until there is capacity.
// It may also ignore the job if it is already in flight.
func (l *MaxInFlightLimiter) Create(ctx context.Context, job *api.CommandJob) error {
select {
case <-ctx.Done():
uuid, err := uuid.Parse(job.Uuid)
if err != nil {
l.logger.Error("invalid UUID in CommandJob", zap.Error(err))
return err
}
if numInFlight, ok := l.casa(uuid, true); !ok {
l.logger.Debug("Create: job is already in-flight",
zap.String("uuid", job.Uuid),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
return nil
default:
return l.add(ctx, job)
}
}

func (l *MaxInFlightLimiter) add(ctx context.Context, job *api.CommandJob) error {
l.mu.Lock()
defer l.mu.Unlock()
// Block until there's a token in the bucket.
select {
case <-ctx.Done():
return context.Cause(ctx)

if _, found := l.inFlight[job.Uuid]; found {
l.logger.Debug("skipping already queued job", zap.String("uuid", job.Uuid))
return nil
}
for l.MaxInFlight > 0 && len(l.inFlight) >= l.MaxInFlight {
l.logger.Debug("max-in-flight reached", zap.Int("in-flight", len(l.inFlight)))
l.completions.Wait()
case <-l.tokenBucket:
l.logger.Debug("Create: token acquired",
zap.String("uuid", uuid.String()),
zap.Int("available-tokens", len(l.tokenBucket)),
)
}

// We got a token from the bucket above! Proceed to schedule the pod.
if err := l.scheduler.Create(ctx, job); err != nil {
// Oh well. Return the token and un-record the job.
l.tryReturnToken()
numInFlight, _ := l.casa(uuid, false)

l.logger.Debug("Create: scheduler failed to enqueue job",
zap.String("uuid", job.Uuid),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
return err
}
l.inFlight[job.Uuid] = struct{}{}
return nil
}

// load jobs at controller startup/restart
func (l *MaxInFlightLimiter) OnAdd(obj interface{}, isInInitialList bool) {
l.mu.Lock()
defer l.mu.Unlock()
// OnAdd is called by k8s to inform us a resource is added.
func (l *MaxInFlightLimiter) OnAdd(obj any, _ bool) {
l.trackJob(obj.(*batchv1.Job))
}

// OnUpdate is called by k8s to inform us a resource is updated.
func (l *MaxInFlightLimiter) OnUpdate(_, obj any) {
l.trackJob(obj.(*batchv1.Job))
}

// OnDelete is called by k8s to inform us a resource is deleted.
func (l *MaxInFlightLimiter) OnDelete(obj any) {
// The job condition at the point of deletion could be non-terminal, so
// we ignore it and skip to marking complete.
job := obj.(*batchv1.Job)
if !jobFinished(job) {
uuid := job.Labels[config.UUIDLabel]
if _, alreadyInFlight := l.inFlight[uuid]; !alreadyInFlight {
l.logger.Debug(
"adding in-flight job",
zap.String("uuid", uuid),
zap.Int("in-flight", len(l.inFlight)),
)
l.inFlight[uuid] = struct{}{}
}
id, err := uuid.Parse(job.Labels[config.UUIDLabel])
if err != nil {
l.logger.Error("invalid UUID in job label", zap.Error(err))
return
}
l.markComplete(id)
}

// if a job is still running, add it to inFlight, otherwise try to remove it
func (l *MaxInFlightLimiter) OnUpdate(_, obj interface{}) {
l.mu.Lock()
defer l.mu.Unlock()

job := obj.(*batchv1.Job)
uuid := job.Labels[config.UUIDLabel]
// trackJob is called by the k8s informer callbacks to update job state and
// take/return tokens. It does the same thing for all three callbacks because
func (l *MaxInFlightLimiter) trackJob(job *batchv1.Job) {
id, err := uuid.Parse(job.Labels[config.UUIDLabel])
if err != nil {
l.logger.Error("invalid UUID in job label", zap.Error(err))
return
}
if jobFinished(job) {
l.markComplete(job)
l.markComplete(id)
} else {
if _, alreadyInFlight := l.inFlight[uuid]; !alreadyInFlight {
l.logger.Debug("waiting for job completion", zap.String("uuid", uuid))
l.inFlight[uuid] = struct{}{}
}
l.markRunning(id)
}
}

// if jobs are deleted before they complete, ensure we remove them from inFlight
func (l *MaxInFlightLimiter) OnDelete(obj interface{}) {
l.mu.Lock()
defer l.mu.Unlock()
// markRunning records a job as in-flight.
func (l *MaxInFlightLimiter) markRunning(id uuid.UUID) {
// Change state from not in-flight to in-flight.
numInFlight, ok := l.casa(id, true)
if !ok {
l.logger.Debug("markRunning: job is already in-flight",
zap.String("uuid", id.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
return
}

// It wasn't recorded as in-flight before, but is now. So it wasn't started
// by this instance of the controller (probably a previous instance).
// Try to take a token from the bucket to keep it in balance. But because
// this job is already running, we don't block waiting for one.
l.tryTakeToken()

l.markComplete(obj.(*batchv1.Job))
l.logger.Debug(
"markRunning: added previously unknown in-flight job",
zap.String("uuid", id.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
}

func (l *MaxInFlightLimiter) markComplete(job *batchv1.Job) {
uuid := job.Labels[config.UUIDLabel]
if _, alreadyInFlight := l.inFlight[uuid]; alreadyInFlight {
delete(l.inFlight, uuid)
l.logger.Debug(
"job complete",
zap.String("uuid", uuid),
zap.Int("in-flight", len(l.inFlight)),
// markComplete records a job as not in-flight.
func (l *MaxInFlightLimiter) markComplete(id uuid.UUID) {
// Change state from in-flight to not in-flight.
numInFlight, ok := l.casa(id, false)
if !ok {
l.logger.Debug("markComplete: job was already not in-flight",
zap.String("uuid", id.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
l.completions.Signal()
return
}
}

func (l *MaxInFlightLimiter) InFlight() int {
l.mu.Lock()
defer l.mu.Unlock()
// It was previously recorded as in-flight, now it is not. So we can
// return a token to the bucket. But we shouldn't block trying to do that:
// this may have been a job started by a previous instance of the controller
// with a higher MaxInFlight than this instance.
l.tryReturnToken()

return len(l.inFlight)
l.logger.Debug("markComplete: job complete",
zap.String("uuid", id.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
}

// jobFinished reports if the job has a Complete or Failed status condition.
func jobFinished(job *batchv1.Job) bool {
for _, cond := range job.Status.Conditions {
switch cond.Type {
case batchv1.JobComplete, batchv1.JobFailed:
// Per the API docs, these are the only terminal job conditions.
return true
}
}
return false
}

// tryTakeToken takes a token from the bucket, if one is available. It does not
// block.
func (l *MaxInFlightLimiter) tryTakeToken() {
select {
case <-l.tokenBucket:
default:
}
}

// tryReturnToken returns a token to the bucket, if not closed or full. It does
// not block.
func (l *MaxInFlightLimiter) tryReturnToken() {
if l.MaxInFlight <= 0 {
return
}
select {
case l.tokenBucket <- struct{}{}:
default:
}
}

// casa is an atomic compare-and-swap-like primitive.
//
// It attempts to update the state of the job from !x to x, and reports
// the in-flight count (after the operation) and whether it was able to change
// the state, i.e. it returns false if the in-flight state of the job was
// already equal to x.
func (l *MaxInFlightLimiter) casa(id uuid.UUID, x bool) (int, bool) {
l.inFlightMu.Lock()
defer l.inFlightMu.Unlock()
if l.inFlight[id] == x {
return len(l.inFlight), false
}
if x {
l.inFlight[id] = true
} else {
delete(l.inFlight, id)
}
return len(l.inFlight), true
}
Loading