diff --git a/internal/controller/deduper/deduper.go b/internal/controller/deduper/deduper.go index e34b763e..e361bff8 100644 --- a/internal/controller/deduper/deduper.go +++ b/internal/controller/deduper/deduper.go @@ -32,12 +32,12 @@ type Deduper struct { // New creates a Deduper. func New(logger *zap.Logger, handler model.JobHandler) *Deduper { - l := &Deduper{ + d := &Deduper{ handler: handler, logger: logger, inFlight: make(map[uuid.UUID]bool), } - return l + return d } // RegisterInformer registers the limiter to listen for Kubernetes job events, @@ -45,12 +45,13 @@ func New(logger *zap.Logger, handler model.JobHandler) *Deduper { func (d *Deduper) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error { informer := factory.Batch().V1().Jobs() jobInformer := informer.Informer() - if _, err := jobInformer.AddEventHandler(d); err != nil { + reg, err := jobInformer.AddEventHandler(d) + if err != nil { return err } go factory.Start(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), jobInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), reg.HasSynced) { return fmt.Errorf("failed to sync informer cache") } @@ -94,60 +95,76 @@ func (d *Deduper) Handle(ctx context.Context, job model.Job) error { } // OnAdd is called by k8s to inform us a resource is added. -func (d *Deduper) OnAdd(obj any, _ bool) { +func (d *Deduper) OnAdd(obj any, inInitialList bool) { job, _ := obj.(*batchv1.Job) if job == nil { return } - d.trackJob(job) + + // Once the initial list is complete, we are told about new jobs in Handle. + if !inInitialList { + return + } + id, err := uuid.Parse(job.Labels[config.UUIDLabel]) + if err != nil { + d.logger.Error("invalid UUID in job label", zap.Error(err)) + return + } + d.markRunning(id, "OnAdd") } // OnUpdate is called by k8s to inform us a resource is updated. -func (d *Deduper) OnUpdate(_, obj any) { - job, _ := obj.(*batchv1.Job) - if job == nil { +func (d *Deduper) OnUpdate(prev, curr any) { + prevState, _ := prev.(*batchv1.Job) + currState, _ := curr.(*batchv1.Job) + if prevState == nil || currState == nil { return } - d.trackJob(job) -} -// OnDelete is called by k8s to inform us a resource is deleted. -func (d *Deduper) OnDelete(obj any) { - // The job condition at the point of deletion could be non-terminal, so - // we ignore it and skip to marking complete. - job, _ := obj.(*batchv1.Job) - if job == nil { + // If this update is not a transition from not-finished to finished, skip + // the rest of the work. + if model.JobFinished(prevState) || !model.JobFinished(currState) { return } - id, err := uuid.Parse(job.Labels[config.UUIDLabel]) + + id, err := uuid.Parse(currState.Labels[config.UUIDLabel]) if err != nil { d.logger.Error("invalid UUID in job label", zap.Error(err)) return } - d.markComplete(id) + d.markNotRunning(id, "OnUpdate") + } -// trackJob is called by the k8s informer callbacks to update job state. -func (d *Deduper) trackJob(job *batchv1.Job) { - id, err := uuid.Parse(job.Labels[config.UUIDLabel]) +// OnDelete is called by k8s to inform us a resource is deleted. +func (d *Deduper) OnDelete(prev any) { + // The job condition at the point of deletion could be non-terminal, so + // we ignore it and skip to marking complete. + prevState, _ := prev.(*batchv1.Job) + if prevState == nil { + return + } + // If the last job state before delete was observed was finished, then + // OnUpdate should have already marked it as complete. + if model.JobFinished(prevState) { + return + } + id, err := uuid.Parse(prevState.Labels[config.UUIDLabel]) if err != nil { d.logger.Error("invalid UUID in job label", zap.Error(err)) return } - if model.JobFinished(job) { - d.markComplete(id) - } else { - d.markRunning(id) - } + d.markNotRunning(id, "OnDelete") } // markRunning records a job as in-flight. -func (d *Deduper) markRunning(id uuid.UUID) { +func (d *Deduper) markRunning(id uuid.UUID, source string) { // Change state from not in-flight to in-flight. numInFlight, ok := d.casa(id, true) if !ok { d.logger.Debug("markRunning: job was already in-flight!", zap.String("uuid", id.String()), + zap.String("source", source), zap.Int("num-in-flight", numInFlight), ) return @@ -156,24 +173,27 @@ func (d *Deduper) markRunning(id uuid.UUID) { d.logger.Debug( "markRunning: added previously unknown in-flight job", zap.String("uuid", id.String()), + zap.String("source", source), zap.Int("num-in-flight", numInFlight), ) } -// markComplete records a job as not in-flight. -func (d *Deduper) markComplete(id uuid.UUID) { +// markNotRunning records a job as not in-flight. +func (d *Deduper) markNotRunning(id uuid.UUID, source string) { // Change state from in-flight to not in-flight. numInFlight, ok := d.casa(id, false) if !ok { - d.logger.Debug("markComplete: job was already not-in-flight!", + d.logger.Debug("markNotRunning: job was already not-in-flight!", zap.String("uuid", id.String()), + zap.String("source", source), zap.Int("num-in-flight", numInFlight), ) return } - d.logger.Debug("markComplete: job complete", + d.logger.Debug("markNotRunning: job complete", zap.String("uuid", id.String()), + zap.String("source", source), zap.Int("num-in-flight", numInFlight), ) } @@ -187,6 +207,7 @@ func (d *Deduper) markComplete(id uuid.UUID) { func (d *Deduper) casa(id uuid.UUID, x bool) (int, bool) { d.inFlightMu.Lock() defer d.inFlightMu.Unlock() + if d.inFlight[id] == x { return len(d.inFlight), false }