diff --git a/internal/controller/monitor/monitor.go b/internal/controller/monitor/monitor.go index 5fc867e4..1aa629c3 100644 --- a/internal/controller/monitor/monitor.go +++ b/internal/controller/monitor/monitor.go @@ -7,6 +7,7 @@ import ( "fmt" "math/rand/v2" "reflect" + "sync" "time" "github.com/Khan/genqlient/graphql" @@ -208,8 +209,13 @@ func (m *Monitor) passJobsToNextHandler(ctx context.Context, logger *zap.Logger, jobsCh := make(chan *api.JobJobTypeCommand) defer close(jobsCh) + var wg sync.WaitGroup for range min(m.cfg.JobCreationConcurrency, len(jobs)) { - go jobHandlerWorker(ctx, staleCtx, logger, handler, agentTags, jobsCh) + wg.Add(1) + go func() { + defer wg.Done() + jobHandlerWorker(ctx, staleCtx, logger, handler, agentTags, jobsCh) + }() } for _, job := range jobs { @@ -221,6 +227,8 @@ func (m *Monitor) passJobsToNextHandler(ctx context.Context, logger *zap.Logger, case jobsCh <- job: } } + + wg.Wait() } func jobHandlerWorker(ctx, staleCtx context.Context, logger *zap.Logger, handler model.JobHandler, agentTags map[string]string, jobsCh <-chan *api.JobJobTypeCommand) {