Skip to content

Commit

Permalink
more logging + retry on pop
Browse files Browse the repository at this point in the history
  • Loading branch information
jpoz committed Jun 27, 2024
1 parent c6ac2b0 commit 340e216
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
14 changes: 13 additions & 1 deletion storage/pop.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,24 @@ func (s *RedisHandler) Pop(ctx context.Context, queues ...string) (*wire.Job, er
queueKeys[i] = s.QueueKey(queue)
}

var payload []string
var err error

// TODO: configure the timeout here
payload, err := s.rdb.BRPop(ctx, 10*time.Second, queueKeys...).Result()
// TODO: configure retries here
for attempt := 0; attempt < 10; attempt++ {
payload, err = s.rdb.BRPop(ctx, 2*time.Second, queueKeys...).Result()
if err == nil {
break
}
s.log.Error("pop job error", "error", err, "attempt", attempt+1)
time.Sleep(1 * time.Second) // Wait a bit before retrying
}
if err == redis.Nil {
return nil, nil
}
if err != nil {
s.log.Error("fatal error to pop job", "error", err)
return nil, err
}

Expand Down
4 changes: 3 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (w *Worker) Run(ctx context.Context) error {
}

for {
slog.Info("Waiting for next job")
job, err := w.handler.Pop(ctx, w.registeredFullNames...)
if err != nil {
slog.Error("failed to pop job", "error", err)
Expand All @@ -287,9 +288,10 @@ func (w *Worker) Run(ctx context.Context) error {
if job == nil {
select {
case <-ctx.Done():
slog.Info("Stopping worker")
slog.Info("Stopping worker(s)")
close(jobsChan) // Close the channel to stop workers
wg.Wait() // Wait for all workers to finish processing
slog.Info("Stopped worker(s)")
return nil
default:
}
Expand Down

0 comments on commit 340e216

Please sign in to comment.