diff --git a/async_consumer.go b/async_consumer.go index c4b4d16..27aecfe 100644 --- a/async_consumer.go +++ b/async_consumer.go @@ -2,7 +2,6 @@ package postq import ( "fmt" - "log" ) // AsyncEventHandlerFunc processes multiple events and returns the failed ones @@ -45,8 +44,7 @@ func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) { } if err := failedEvents.Update(ctx, tx.Conn()); err != nil { - // TODO: More robust way to handle failed event insertion failures - log.Printf("error saving event attempt updates to event_queue: %v\n", err) + ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err) } return len(events), tx.Commit(ctx) diff --git a/context.go b/context.go index 9858741..cef56bb 100644 --- a/context.go +++ b/context.go @@ -9,4 +9,6 @@ import ( type Context interface { context.Context Pool() *pgxpool.Pool + Debugf(message string, args ...interface{}) + Tracef(message string, args ...interface{}) } diff --git a/event.go b/event.go index d40ee34..37f7112 100644 --- a/event.go +++ b/event.go @@ -2,6 +2,7 @@ package postq import ( "fmt" + "strings" "time" "github.com/google/uuid" @@ -11,14 +12,17 @@ import ( // Event represents the event queue table. // The table must have the following fields. type Event struct { - ID uuid.UUID `json:"id"` - Name string `json:"name"` - Error *string `json:"error"` - Attempts int `json:"attempts"` - LastAttempt *time.Time `json:"last_attempt"` - - Properties map[string]string `json:"properties"` - CreatedAt time.Time `json:"created_at"` + ID uuid.UUID `json:"id"` + Name string `json:"name"` + Error *string `json:"error"` + Attempts int `json:"attempts"` + LastAttempt *time.Time `json:"last_attempt"` + Properties map[string]string `json:"properties"` + CreatedAt time.Time `json:"created_at"` +} + +func (t Event) TableName() string { + return "event_queue" } func (t *Event) SetError(err string) { @@ -88,11 +92,14 @@ type EventFetcherOption struct { // fetchEvents fetches given watch events from the `event_queue` table. func fetchEvents(ctx Context, tx pgx.Tx, watchEvents []string, batchSize int, opts *EventFetcherOption) ([]Event, error) { + if batchSize == 0 { + batchSize = 1 + } const selectEventsQuery = ` DELETE FROM event_queue WHERE id IN ( SELECT id FROM event_queue - WHERE + WHERE attempts <= @maxAttempts AND name = ANY(@events) AND (last_attempt IS NULL OR last_attempt <= NOW() - INTERVAL '1 SECOND' * @baseDelay * POWER(attempts, @exponent)) @@ -145,5 +152,8 @@ func fetchEvents(ctx Context, tx pgx.Tx, watchEvents []string, batchSize int, op return nil, fmt.Errorf("error iterating rows: %w", rows.Err()) } + if len(events) > 0 { + ctx.Tracef("%s %d events fetched", strings.Join(watchEvents, ","), len(events)) + } return events, nil } diff --git a/go.mod b/go.mod index 102bbfe..75982b8 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,17 @@ module github.com/flanksource/postq go 1.20 require ( - github.com/google/uuid v1.3.1 - github.com/jackc/pgx/v5 v5.3.1 + github.com/google/uuid v1.5.0 + github.com/jackc/pgx/v5 v5.5.1 github.com/sethvargo/go-retry v0.2.4 ) require ( github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/puddle/v2 v2.2.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/stretchr/testify v1.8.4 // indirect - golang.org/x/crypto v0.12.0 // indirect - golang.org/x/sync v0.3.0 // indirect - golang.org/x/text v0.12.0 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sync v0.5.0 // indirect + golang.org/x/text v0.14.0 // indirect ) diff --git a/go.sum b/go.sum index ce9ddd4..78b39b5 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,15 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU= -github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8= -github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk= -github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= +github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI= +github.com/jackc/pgx/v5 v5.5.1/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= @@ -19,12 +19,12 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= -golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pg_consumer.go b/pg_consumer.go index f6be4cd..aec991f 100644 --- a/pg_consumer.go +++ b/pg_consumer.go @@ -20,7 +20,7 @@ type PGConsumer struct { consumerFunc ConsumerFunc // handle errors when consuming. - errorHandler func(e error) bool + errorHandler func(ctx Context, e error) bool } type ConsumerOption struct { @@ -34,8 +34,8 @@ type ConsumerOption struct { // handle errors when consuming. // returns whether to retry or not. - // default: sleep for 5 seconds and retry. - ErrorHandler func(err error) bool + // default: sleep for 1s and retry. + ErrorHandler func(ctx Context, e error) bool } // NewPGConsumer returns a new EventConsumer @@ -63,6 +63,7 @@ func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer, if opt.ErrorHandler != nil { ec.errorHandler = opt.ErrorHandler } + } return ec, nil @@ -73,7 +74,7 @@ func (t *PGConsumer) ConsumeUntilEmpty(ctx Context) { for { count, err := t.consumerFunc(ctx) if err != nil { - if !t.errorHandler(err) { + if !t.errorHandler(ctx, err) { return } } else if count == 0 { @@ -99,7 +100,8 @@ func (e *PGConsumer) Listen(ctx Context, pgNotify <-chan string) { } } -func defaultErrorHandler(_ error) bool { - time.Sleep(time.Second * 5) +func defaultErrorHandler(ctx Context, e error) bool { + time.Sleep(time.Second) + ctx.Debugf("default error: %v", e) return true } diff --git a/sync_consumer.go b/sync_consumer.go index 41cab48..0d14c38 100644 --- a/sync_consumer.go +++ b/sync_consumer.go @@ -2,7 +2,6 @@ package postq import ( "fmt" - "log" ) // SyncEventHandlerFunc processes a single event and ONLY makes db changes. @@ -38,7 +37,7 @@ func (t *SyncEventConsumer) Handle(ctx Context) (int, error) { event.SetError(err.Error()) const query = `UPDATE event_queue SET error=$1, attempts=$2, last_attempt=NOW() WHERE id=$3` if _, err := ctx.Pool().Exec(ctx, query, event.Error, event.Attempts, event.ID); err != nil { - log.Printf("error saving event attempt updates to event_queue: %v\n", err) + ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err) } }