Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix limiter token tracking (again) #432

Merged
merged 1 commit into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 35 additions & 36 deletions internal/controller/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"fmt"
"reflect"

"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/model"

"github.com/google/uuid"
"go.uber.org/zap"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -59,12 +57,13 @@ func New(logger *zap.Logger, scheduler model.JobHandler, maxInFlight int) *MaxIn
func (l *MaxInFlight) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
informer := factory.Batch().V1().Jobs()
jobInformer := informer.Informer()
if _, err := jobInformer.AddEventHandler(l); err != nil {
reg, err := jobInformer.AddEventHandler(l)
if err != nil {
return err
}
go factory.Start(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(), jobInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), reg.HasSynced) {
return fmt.Errorf("failed to sync informer cache")
}

Expand Down Expand Up @@ -98,7 +97,7 @@ func (l *MaxInFlight) Handle(ctx context.Context, job model.Job) error {
zap.String("uuid", job.Uuid),
)
if err := l.handler.Handle(ctx, job); err != nil {
// Oh well. Return the token and un-record the job.
// Oh well. Return the token.
l.tryReturnToken()

l.logger.Debug("next handler failed",
Expand All @@ -111,54 +110,54 @@ func (l *MaxInFlight) Handle(ctx context.Context, job model.Job) error {
}

// OnAdd is called by k8s to inform us a resource is added.
func (l *MaxInFlight) OnAdd(obj any, _ bool) {
func (l *MaxInFlight) OnAdd(obj any, inInitialList bool) {
job, _ := obj.(*batchv1.Job)
if job == nil {
return
}
l.trackJob(job)
l.logger.Debug("at end of OnAdd", zap.Int("tokens-available", len(l.tokenBucket)))
}

// OnUpdate is called by k8s to inform us a resource is updated.
func (l *MaxInFlight) OnUpdate(_, obj any) {
job, _ := obj.(*batchv1.Job)
if job == nil {
if !inInitialList {
// After sync is finished, the limiter handler takes tokens directly.
return
}
l.trackJob(job)
l.logger.Debug("at end of OnUpdate", zap.Int("tokens-available", len(l.tokenBucket)))
// Before sync is finished: we're learning about existing jobs, so we should
// (try to) take tokens for unfinished jobs started by a previous controller.
// If it's added as already finished, no need to take a token for it.
// Otherwise, try to take one, but don't block (in case the stack was
// restarted with a different limit).
if !model.JobFinished(job) {
l.tryTakeToken()
l.logger.Debug("existing not-finished job discovered", zap.Int("tokens-available", len(l.tokenBucket)))
}
}

// OnDelete is called by k8s to inform us a resource is deleted.
func (l *MaxInFlight) OnDelete(obj any) {
// The job condition at the point of deletion could be non-terminal, but
// it is being deleted, so ignore it and skip to marking complete.
// If buildkite.com/job-uuid label is missing or malformed, don't track it.
job, _ := obj.(*batchv1.Job)
if job == nil {
// OnUpdate is called by k8s to inform us a resource is updated.
func (l *MaxInFlight) OnUpdate(prev, curr any) {
prevState, _ := prev.(*batchv1.Job)
currState, _ := curr.(*batchv1.Job)
if prevState == nil || currState == nil {
return
}
l.trackJob(job)
if _, err := uuid.Parse(job.Labels[config.UUIDLabel]); err != nil {
return
// Only take or return a token if the job state has *changed*.
// The only valid change is from not-finished to finished.
if !model.JobFinished(prevState) && model.JobFinished(currState) {
l.tryReturnToken()
l.logger.Debug("job state changed from not-finished to finished", zap.Int("tokens-available", len(l.tokenBucket)))
}
l.tryReturnToken()
l.logger.Debug("at end of OnDelete", zap.Int("tokens-available", len(l.tokenBucket)))
}

// trackJob is called by the k8s informer callbacks to update job state and
// take/return tokens. It does the same thing for all three callbacks.
func (l *MaxInFlight) trackJob(job *batchv1.Job) {
// If buildkite.com/job-uuid label is missing or malformed, don't track it.
if _, err := uuid.Parse(job.Labels[config.UUIDLabel]); err != nil {
// OnDelete is called by k8s to inform us a resource is deleted.
func (l *MaxInFlight) OnDelete(obj any) {
prevState, _ := obj.(*batchv1.Job)
if prevState == nil {
return
}

if model.JobFinished(job) {
// OnDelete gives us the last-known state prior to deletion.
// If that state was finished, we've already returned a token.
// If that state was not-finished, we need to return a token now.
if !model.JobFinished(prevState) {
l.tryReturnToken()
} else {
l.tryTakeToken()
l.logger.Debug("not-finished job was deleted", zap.Int("tokens-available", len(l.tokenBucket)))
}
}

Expand Down
23 changes: 16 additions & 7 deletions internal/controller/model/fake_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,23 @@ func (f *FakeScheduler) complete(uuid string) {
f.Finished = append(f.Finished, uuid)
f.mu.Unlock()

f.EventHandler.OnUpdate(nil, &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{config.UUIDLabel: uuid},
f.EventHandler.OnUpdate(
// Previous state
&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{config.UUIDLabel: uuid},
},
// No status conditions
},
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{{Type: batchv1.JobComplete}},
},
})
// New state
&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{config.UUIDLabel: uuid},
},
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{{Type: batchv1.JobComplete}},
},
})
f.wg.Done()
}

Expand Down