diff --git a/storage/pop.go b/storage/pop.go index a4ba84c..3d08f0f 100644 --- a/storage/pop.go +++ b/storage/pop.go @@ -32,12 +32,24 @@ func (s *RedisHandler) Pop(ctx context.Context, queues ...string) (*wire.Job, er queueKeys[i] = s.QueueKey(queue) } + var payload []string + var err error + // TODO: configure the timeout here - payload, err := s.rdb.BRPop(ctx, 10*time.Second, queueKeys...).Result() + // TODO: configure retries here + for attempt := 0; attempt < 10; attempt++ { + payload, err = s.rdb.BRPop(ctx, 2*time.Second, queueKeys...).Result() + if err == nil { + break + } + s.log.Error("pop job error", "error", err, "attempt", attempt+1) + time.Sleep(1 * time.Second) // Wait a bit before retrying + } if err == redis.Nil { return nil, nil } if err != nil { + s.log.Error("fatal error to pop job", "error", err) return nil, err } diff --git a/worker.go b/worker.go index 316e0a9..27efc24 100644 --- a/worker.go +++ b/worker.go @@ -278,6 +278,7 @@ func (w *Worker) Run(ctx context.Context) error { } for { + slog.Info("Waiting for next job") job, err := w.handler.Pop(ctx, w.registeredFullNames...) if err != nil { slog.Error("failed to pop job", "error", err) @@ -287,9 +288,10 @@ func (w *Worker) Run(ctx context.Context) error { if job == nil { select { case <-ctx.Done(): - slog.Info("Stopping worker") + slog.Info("Stopping worker(s)") close(jobsChan) // Close the channel to stop workers wg.Wait() // Wait for all workers to finish processing + slog.Info("Stopped worker(s)") return nil default: }