Skip to content

Commit

Permalink
Overhaul of MaxInFlightLimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Aug 22, 2024
1 parent 3b9fb54 commit bcb4676
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 251 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/go-playground/locales v0.14.1
github.com/go-playground/universal-translator v0.18.1
github.com/go-playground/validator/v10 v10.22.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
274 changes: 197 additions & 77 deletions internal/controller/scheduler/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,60 @@ import (
"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/monitor"

"github.com/google/uuid"
"go.uber.org/zap"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

// MaxInFlightLimiter is a job handler that wraps another job handler
// (typically the actual job scheduler) and only creates new jobs if the total
// number of jobs currently running is below a limit.
type MaxInFlightLimiter struct {
scheduler monitor.JobHandler
// MaxInFlight sets the upper limit on number of jobs running concurrently
// in the cluster. 0 means no limit.
MaxInFlight int

logger *zap.Logger
mu sync.RWMutex
inFlight map[string]struct{}
completions *sync.Cond
// scheduler is the thing that actually schedules jobs in the cluster.
scheduler monitor.JobHandler

// Logs go here
logger *zap.Logger

// Map to track in-flight jobs, and mutex to protect it.
inFlightMu sync.Mutex
inFlight map[uuid.UUID]bool

// When a job starts, it takes a token from the bucket.
// When a job ends, it puts a token back in the bucket.
tokenBucket chan struct{}
}

func NewLimiter(
logger *zap.Logger,
scheduler monitor.JobHandler,
maxInFlight int,
) *MaxInFlightLimiter {
// NewLimiter creates a MaxInFlightLimiter.
func NewLimiter(logger *zap.Logger, scheduler monitor.JobHandler, maxInFlight int) *MaxInFlightLimiter {
l := &MaxInFlightLimiter{
scheduler: scheduler,
MaxInFlight: maxInFlight,
logger: logger,
inFlight: make(map[string]struct{}),
inFlight: make(map[uuid.UUID]bool),
tokenBucket: make(chan struct{}, maxInFlight),
}
if maxInFlight <= 0 { // infinite capacity
// All receives from a closed channel succeed immediately.
close(l.tokenBucket)
}
for range maxInFlight { // finite capacity
// Fill the bucket with tokens.
l.tokenBucket <- struct{}{}
}
l.completions = sync.NewCond(&l.mu)
return l
}

// Creates a Jobs informer, registers the handler on it, and waits for cache sync
func (l *MaxInFlightLimiter) RegisterInformer(
ctx context.Context,
factory informers.SharedInformerFactory,
) error {
// RegisterInformer registers the limiter to listen for Kubernetes job events,
// and waits for cache sync.
func (l *MaxInFlightLimiter) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
informer := factory.Batch().V1().Jobs()
jobInformer := informer.Informer()
if _, err := jobInformer.AddEventHandler(l); err != nil {
Expand All @@ -58,104 +76,206 @@ func (l *MaxInFlightLimiter) RegisterInformer(
return nil
}

// Create either creates the job immediately, or blocks until there is capacity.
// It may also ignore the job if it is already in flight.
func (l *MaxInFlightLimiter) Create(ctx context.Context, job *api.CommandJob) error {
select {
case <-ctx.Done():
uuid, err := uuid.Parse(job.Uuid)
if err != nil {
l.logger.Error("invalid UUID in CommandJob", zap.Error(err))
return err
}
if numInFlight, ok := l.casa(uuid, true); !ok {
l.logger.Debug("Create: job is already in-flight",
zap.String("uuid", job.Uuid),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
return nil
default:
return l.add(ctx, job)
}
}

func (l *MaxInFlightLimiter) add(ctx context.Context, job *api.CommandJob) error {
l.mu.Lock()
defer l.mu.Unlock()
// Block until there's a token in the bucket.
select {
case <-ctx.Done():
return context.Cause(ctx)

if _, found := l.inFlight[job.Uuid]; found {
l.logger.Debug("skipping already queued job", zap.String("uuid", job.Uuid))
return nil
}
for l.MaxInFlight > 0 && len(l.inFlight) >= l.MaxInFlight {
l.logger.Debug("max-in-flight reached", zap.Int("in-flight", len(l.inFlight)))
l.completions.Wait()
case <-l.tokenBucket:
l.logger.Debug("Create: token acquired",
zap.String("uuid", uuid.String()),
zap.Int("available-tokens", len(l.tokenBucket)),
)
}

// We got a token from the bucket above! Proceed to schedule the pod.
if err := l.scheduler.Create(ctx, job); err != nil {
// Oh well. Return the token and un-record the job.
l.tryReturnToken()
numInFlight, _ := l.casa(uuid, false)

l.logger.Debug("Create: scheduler failed to enqueue job",
zap.String("uuid", job.Uuid),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
return err
}
l.inFlight[job.Uuid] = struct{}{}
return nil
}

// load jobs at controller startup/restart
func (l *MaxInFlightLimiter) OnAdd(obj interface{}, isInInitialList bool) {
l.mu.Lock()
defer l.mu.Unlock()

// OnAdd records jobs as in-flight (if not already).
func (l *MaxInFlightLimiter) OnAdd(obj any, _ bool) {
job := obj.(*batchv1.Job)
if !jobFinished(job) {
uuid := job.Labels[config.UUIDLabel]
if _, alreadyInFlight := l.inFlight[uuid]; !alreadyInFlight {
l.logger.Debug(
"adding in-flight job",
zap.String("uuid", uuid),
zap.Int("in-flight", len(l.inFlight)),
)
l.inFlight[uuid] = struct{}{}
}
if jobFinished(job) {
// Bit weird for a job to be added as finished, but whatever.
return
}
}

// if a job is still running, add it to inFlight, otherwise try to remove it
func (l *MaxInFlightLimiter) OnUpdate(_, obj interface{}) {
l.mu.Lock()
defer l.mu.Unlock()
uuid, err := uuid.Parse(job.Labels[config.UUIDLabel])
if err != nil {
l.logger.Error("invalid UUID in job label", zap.Error(err))
return
}
numInFlight, ok := l.casa(uuid, true)
if !ok {
l.logger.Debug("OnAdd: job is already in-flight",
zap.String("uuid", uuid.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
return
}

// It wasn't recorded as in-flight before, but is now. So it wasn't started
// by this instance of the controller (probably a previous instance).
// Try to take a token from the bucket to keep it in balance. But because
// this job is already running, we don't block waiting for one.
l.tryTakeToken()

l.logger.Debug(
"OnAdd: added previously unknown in-flight job",
zap.String("uuid", uuid.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
}

// OnUpdate checks if a job is running. If it is, it records the job as being
// in-flight and takes a token, otherwise it removes the record and returns a
// token. This is the main mechanism that returns tokens to the bucket.
func (l *MaxInFlightLimiter) OnUpdate(_, obj any) {
job := obj.(*batchv1.Job)
uuid := job.Labels[config.UUIDLabel]
if jobFinished(job) {
l.markComplete(job)
} else {
if _, alreadyInFlight := l.inFlight[uuid]; !alreadyInFlight {
l.logger.Debug("waiting for job completion", zap.String("uuid", uuid))
l.inFlight[uuid] = struct{}{}
}
return
}
}
// The job is still going.
uuid, err := uuid.Parse(job.Labels[config.UUIDLabel])
if err != nil {
l.logger.Error("invalid UUID in job label", zap.Error(err))
return
}
numInFlight, ok := l.casa(uuid, true)
if !ok {
l.logger.Debug("OnUpdate: job was already in-flight",
zap.String("uuid", uuid.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
return
}
// Similar logic to OnAdd.
l.tryTakeToken()

// if jobs are deleted before they complete, ensure we remove them from inFlight
func (l *MaxInFlightLimiter) OnDelete(obj interface{}) {
l.mu.Lock()
defer l.mu.Unlock()
l.logger.Debug("OnUpdate: added previously unknown in-flight job",
zap.String("uuid", uuid.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
}

// OnDelete removes deleted jobs from in-flight tracking, and returns tokens.
// This is needed in case jobs are evicted some other way (e.g. manually).
func (l *MaxInFlightLimiter) OnDelete(obj any) {
l.markComplete(obj.(*batchv1.Job))
}

func (l *MaxInFlightLimiter) markComplete(job *batchv1.Job) {
uuid := job.Labels[config.UUIDLabel]
if _, alreadyInFlight := l.inFlight[uuid]; alreadyInFlight {
delete(l.inFlight, uuid)
l.logger.Debug(
"job complete",
zap.String("uuid", uuid),
zap.Int("in-flight", len(l.inFlight)),
uuid, err := uuid.Parse(job.Labels[config.UUIDLabel])
if err != nil {
l.logger.Error("invalid UUID in job label", zap.Error(err))
return
}
numInFlight, ok := l.casa(uuid, false)
if !ok {
l.logger.Debug("markComplete: job was already not in-flight",
zap.String("uuid", uuid.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
l.completions.Signal()
return
}
}

func (l *MaxInFlightLimiter) InFlight() int {
l.mu.Lock()
defer l.mu.Unlock()
// It was previously recorded as in-flight, now it is not. So we can
// return a token to the bucket. But we shouldn't block trying to do that:
// this may have been a job started by a previous instance of the controller
// with a higher MaxInFlight than this instance.
l.tryReturnToken()

return len(l.inFlight)
l.logger.Debug("job complete",
zap.String("uuid", uuid.String()),
zap.Int("num-in-flight", numInFlight),
zap.Int("available-tokens", len(l.tokenBucket)),
)
}

// jobFinished reports if the job has a Complete or Failed status condition.
func jobFinished(job *batchv1.Job) bool {
for _, cond := range job.Status.Conditions {
switch cond.Type {
case batchv1.JobComplete, batchv1.JobFailed:
// Per the API docs, these are the only terminal job conditions.
return true
}
}
return false
}

// tryTakeToken takes a token from the bucket, if one is available. It does not
// block.
func (l *MaxInFlightLimiter) tryTakeToken() {
select {
case <-l.tokenBucket:
default:
}
}

// tryReturnToken returns a token to the bucket, if not closed or full. It does
// not block.
func (l *MaxInFlightLimiter) tryReturnToken() {
if l.MaxInFlight <= 0 {
return
}
select {
case l.tokenBucket <- struct{}{}:
default:
}
}

// casa is an atomic compare-and-swap-like primitive.
//
// It attempts to update the state of the job from !x to x, and reports
// the in-flight count (after the operation) and whether it was able to change
// the state, i.e. it returns false if the in-flight state of the job was
// already equal to x.
func (l *MaxInFlightLimiter) casa(id uuid.UUID, x bool) (int, bool) {
l.inFlightMu.Lock()
defer l.inFlightMu.Unlock()
if l.inFlight[id] == x {
return len(l.inFlight), false
}
if x {
l.inFlight[id] = true
} else {
delete(l.inFlight, id)
}
return len(l.inFlight), true
}
Loading

0 comments on commit bcb4676

Please sign in to comment.