Skip to content

Commit

Permalink
Refine informer usage in deduper
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Nov 26, 2024
1 parent e166db8 commit 14a52fc
Showing 1 changed file with 53 additions and 32 deletions.
85 changes: 53 additions & 32 deletions internal/controller/deduper/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,26 @@ type Deduper struct {

// New creates a Deduper.
func New(logger *zap.Logger, handler model.JobHandler) *Deduper {
l := &Deduper{
d := &Deduper{
handler: handler,
logger: logger,
inFlight: make(map[uuid.UUID]bool),
}
return l
return d
}

// RegisterInformer registers the limiter to listen for Kubernetes job events,
// and waits for cache sync.
func (d *Deduper) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
informer := factory.Batch().V1().Jobs()
jobInformer := informer.Informer()
if _, err := jobInformer.AddEventHandler(d); err != nil {
reg, err := jobInformer.AddEventHandler(d)
if err != nil {
return err
}
go factory.Start(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(), jobInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), reg.HasSynced) {
return fmt.Errorf("failed to sync informer cache")
}

Expand Down Expand Up @@ -94,60 +95,76 @@ func (d *Deduper) Handle(ctx context.Context, job model.Job) error {
}

// OnAdd is called by k8s to inform us a resource is added.
func (d *Deduper) OnAdd(obj any, _ bool) {
func (d *Deduper) OnAdd(obj any, inInitialList bool) {
job, _ := obj.(*batchv1.Job)
if job == nil {
return
}
d.trackJob(job)

// Once the initial list is complete, we are told about new jobs in Handle.
if !inInitialList {
return
}
id, err := uuid.Parse(job.Labels[config.UUIDLabel])
if err != nil {
d.logger.Error("invalid UUID in job label", zap.Error(err))
return
}
d.markRunning(id, "OnAdd")
}

// OnUpdate is called by k8s to inform us a resource is updated.
func (d *Deduper) OnUpdate(_, obj any) {
job, _ := obj.(*batchv1.Job)
if job == nil {
func (d *Deduper) OnUpdate(prev, curr any) {
prevState, _ := prev.(*batchv1.Job)
currState, _ := curr.(*batchv1.Job)
if prevState == nil || currState == nil {
return
}
d.trackJob(job)
}

// OnDelete is called by k8s to inform us a resource is deleted.
func (d *Deduper) OnDelete(obj any) {
// The job condition at the point of deletion could be non-terminal, so
// we ignore it and skip to marking complete.
job, _ := obj.(*batchv1.Job)
if job == nil {
// If this update is not a transition from not-finished to finished, skip
// the rest of the work.
if model.JobFinished(prevState) || !model.JobFinished(currState) {
return
}
id, err := uuid.Parse(job.Labels[config.UUIDLabel])

id, err := uuid.Parse(currState.Labels[config.UUIDLabel])
if err != nil {
d.logger.Error("invalid UUID in job label", zap.Error(err))
return
}
d.markComplete(id)
d.markNotRunning(id, "OnUpdate")

}

// trackJob is called by the k8s informer callbacks to update job state.
func (d *Deduper) trackJob(job *batchv1.Job) {
id, err := uuid.Parse(job.Labels[config.UUIDLabel])
// OnDelete is called by k8s to inform us a resource is deleted.
func (d *Deduper) OnDelete(prev any) {
// The job condition at the point of deletion could be non-terminal, so
// we ignore it and skip to marking complete.
prevState, _ := prev.(*batchv1.Job)
if prevState == nil {
return
}
// If the last job state before delete was observed was finished, then
// OnUpdate should have already marked it as complete.
if model.JobFinished(prevState) {
return
}
id, err := uuid.Parse(prevState.Labels[config.UUIDLabel])
if err != nil {
d.logger.Error("invalid UUID in job label", zap.Error(err))
return
}
if model.JobFinished(job) {
d.markComplete(id)
} else {
d.markRunning(id)
}
d.markNotRunning(id, "OnDelete")
}

// markRunning records a job as in-flight.
func (d *Deduper) markRunning(id uuid.UUID) {
func (d *Deduper) markRunning(id uuid.UUID, source string) {
// Change state from not in-flight to in-flight.
numInFlight, ok := d.casa(id, true)
if !ok {
d.logger.Debug("markRunning: job was already in-flight!",
zap.String("uuid", id.String()),
zap.String("source", source),
zap.Int("num-in-flight", numInFlight),
)
return
Expand All @@ -156,24 +173,27 @@ func (d *Deduper) markRunning(id uuid.UUID) {
d.logger.Debug(
"markRunning: added previously unknown in-flight job",
zap.String("uuid", id.String()),
zap.String("source", source),
zap.Int("num-in-flight", numInFlight),
)
}

// markComplete records a job as not in-flight.
func (d *Deduper) markComplete(id uuid.UUID) {
// markNotRunning records a job as not in-flight.
func (d *Deduper) markNotRunning(id uuid.UUID, source string) {
// Change state from in-flight to not in-flight.
numInFlight, ok := d.casa(id, false)
if !ok {
d.logger.Debug("markComplete: job was already not-in-flight!",
d.logger.Debug("markNotRunning: job was already not-in-flight!",
zap.String("uuid", id.String()),
zap.String("source", source),
zap.Int("num-in-flight", numInFlight),
)
return
}

d.logger.Debug("markComplete: job complete",
d.logger.Debug("markNotRunning: job complete",
zap.String("uuid", id.String()),
zap.String("source", source),
zap.Int("num-in-flight", numInFlight),
)
}
Expand All @@ -187,6 +207,7 @@ func (d *Deduper) markComplete(id uuid.UUID) {
func (d *Deduper) casa(id uuid.UUID, x bool) (int, bool) {
d.inFlightMu.Lock()
defer d.inFlightMu.Unlock()

if d.inFlight[id] == x {
return len(d.inFlight), false
}
Expand Down

0 comments on commit 14a52fc

Please sign in to comment.