From 8124b4ea9bbbe0f19b80543aa359cf5b15d5e9d0 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 20 Mar 2024 11:50:16 +0545 Subject: [PATCH] fix: handling failed events --- async_consumer.go | 8 +------- event.go | 16 +++++++++++----- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/async_consumer.go b/async_consumer.go index 27aecfe..3a1b2eb 100644 --- a/async_consumer.go +++ b/async_consumer.go @@ -37,13 +37,7 @@ func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) { } failedEvents := t.Consumer(ctx, events) - - for i := range failedEvents { - e := failedEvents[i] - e.Attempts += 1 - } - - if err := failedEvents.Update(ctx, tx.Conn()); err != nil { + if err := failedEvents.Recreate(ctx, tx.Conn()); err != nil { ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err) } diff --git a/event.go b/event.go index 37f7112..591c175 100644 --- a/event.go +++ b/event.go @@ -19,6 +19,7 @@ type Event struct { LastAttempt *time.Time `json:"last_attempt"` Properties map[string]string `json:"properties"` CreatedAt time.Time `json:"created_at"` + Priority int `json:"priority"` } func (t Event) TableName() string { @@ -39,6 +40,7 @@ func (t *Event) Scan(rows pgx.Row) error { &t.Error, &t.LastAttempt, &t.Attempts, + &t.Priority, ) if err != nil { return err @@ -49,16 +51,20 @@ func (t *Event) Scan(rows pgx.Row) error { type Events []Event -// Update updates the events in batches. -func (events Events) Update(ctx Context, tx *pgx.Conn) error { +// Recreate creates the given failed events in batches after updating the +// attempts count. +func (events Events) Recreate(ctx Context, tx *pgx.Conn) error { if len(events) == 0 { return nil } var batch pgx.Batch for _, event := range events { - query := `UPDATE event_queue SET error=$1, attempts=$2, last_attempt=NOW() WHERE id=$3` - batch.Queue(query, event.Error, event.Attempts, event.ID) + attempts := event.Attempts + 1 + query := `INSERT INTO event_queue + (name, properties, error, last_attempt, attempts, priority) + VALUES($1, $2, $3, NOW(), $4, $5)` + batch.Queue(query, event.Name, event.Properties, event.Error, attempts, event.Priority) } br := tx.SendBatch(ctx, &batch) @@ -107,7 +113,7 @@ func fetchEvents(ctx Context, tx pgx.Tx, watchEvents []string, batchSize int, op FOR UPDATE SKIP LOCKED LIMIT @batchSize ) - RETURNING id, name, created_at, properties, error, last_attempt, attempts + RETURNING id, name, created_at, properties, error, last_attempt, attempts, priority ` args := pgx.NamedArgs{