Skip to content

Commit

Permalink
feat: improve log handling
Browse files Browse the repository at this point in the history
  • Loading branch information
moshloop committed Jan 1, 2024
1 parent 6bb5a81 commit 33b24a2
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 41 deletions.
4 changes: 1 addition & 3 deletions async_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package postq

import (
"fmt"
"log"
)

// AsyncEventHandlerFunc processes multiple events and returns the failed ones
Expand Down Expand Up @@ -45,8 +44,7 @@ func (t *AsyncEventConsumer) Handle(ctx Context) (int, error) {
}

if err := failedEvents.Update(ctx, tx.Conn()); err != nil {
// TODO: More robust way to handle failed event insertion failures
log.Printf("error saving event attempt updates to event_queue: %v\n", err)
ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err)
}

return len(events), tx.Commit(ctx)
Expand Down
2 changes: 2 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ import (
type Context interface {
context.Context
Pool() *pgxpool.Pool
Debugf(message string, args ...interface{})
Tracef(message string, args ...interface{})
}
28 changes: 19 additions & 9 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package postq

import (
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand All @@ -11,14 +12,17 @@ import (
// Event represents the event queue table.
// The table must have the following fields.
type Event struct {
ID uuid.UUID `json:"id"`
Name string `json:"name"`
Error *string `json:"error"`
Attempts int `json:"attempts"`
LastAttempt *time.Time `json:"last_attempt"`

Properties map[string]string `json:"properties"`
CreatedAt time.Time `json:"created_at"`
ID uuid.UUID `json:"id"`
Name string `json:"name"`
Error *string `json:"error"`
Attempts int `json:"attempts"`
LastAttempt *time.Time `json:"last_attempt"`
Properties map[string]string `json:"properties"`
CreatedAt time.Time `json:"created_at"`
}

func (t Event) TableName() string {
return "event_queue"
}

func (t *Event) SetError(err string) {
Expand Down Expand Up @@ -88,11 +92,14 @@ type EventFetcherOption struct {

// fetchEvents fetches given watch events from the `event_queue` table.
func fetchEvents(ctx Context, tx pgx.Tx, watchEvents []string, batchSize int, opts *EventFetcherOption) ([]Event, error) {
if batchSize == 0 {
batchSize = 1
}
const selectEventsQuery = `
DELETE FROM event_queue
WHERE id IN (
SELECT id FROM event_queue
WHERE
WHERE
attempts <= @maxAttempts AND
name = ANY(@events) AND
(last_attempt IS NULL OR last_attempt <= NOW() - INTERVAL '1 SECOND' * @baseDelay * POWER(attempts, @exponent))
Expand Down Expand Up @@ -145,5 +152,8 @@ func fetchEvents(ctx Context, tx pgx.Tx, watchEvents []string, batchSize int, op
return nil, fmt.Errorf("error iterating rows: %w", rows.Err())
}

if len(events) > 0 {
ctx.Tracef("%s %d events fetched", strings.Join(watchEvents, ","), len(events))
}
return events, nil
}
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ module github.com/flanksource/postq
go 1.20

require (
github.com/google/uuid v1.3.1
github.com/jackc/pgx/v5 v5.3.1
github.com/google/uuid v1.5.0
github.com/jackc/pgx/v5 v5.5.1
github.com/sethvargo/go-retry v0.2.4
)

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.3.1 h1:Fcr8QJ1ZeLi5zsPZqQeUZhNhxfkkKBOgJuYkJHoBOtU=
github.com/jackc/pgx/v5 v5.3.1/go.mod h1:t3JDKnCBlYIc0ewLF0Q7B8MXmoIaBOZj/ic7iHozM/8=
github.com/jackc/puddle/v2 v2.2.0 h1:RdcDk92EJBuBS55nQMMYFXTxwstHug4jkhT5pq8VxPk=
github.com/jackc/puddle/v2 v2.2.0/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI=
github.com/jackc/pgx/v5 v5.5.1/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
Expand All @@ -19,12 +19,12 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
14 changes: 8 additions & 6 deletions pg_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type PGConsumer struct {
consumerFunc ConsumerFunc

// handle errors when consuming.
errorHandler func(e error) bool
errorHandler func(ctx Context, e error) bool
}

type ConsumerOption struct {
Expand All @@ -34,8 +34,8 @@ type ConsumerOption struct {

// handle errors when consuming.
// returns whether to retry or not.
// default: sleep for 5 seconds and retry.
ErrorHandler func(err error) bool
// default: sleep for 1s and retry.
ErrorHandler func(ctx Context, e error) bool
}

// NewPGConsumer returns a new EventConsumer
Expand Down Expand Up @@ -63,6 +63,7 @@ func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer,
if opt.ErrorHandler != nil {
ec.errorHandler = opt.ErrorHandler
}

}

return ec, nil
Expand All @@ -73,7 +74,7 @@ func (t *PGConsumer) ConsumeUntilEmpty(ctx Context) {
for {
count, err := t.consumerFunc(ctx)
if err != nil {
if !t.errorHandler(err) {
if !t.errorHandler(ctx, err) {
return
}
} else if count == 0 {
Expand All @@ -99,7 +100,8 @@ func (e *PGConsumer) Listen(ctx Context, pgNotify <-chan string) {
}
}

func defaultErrorHandler(_ error) bool {
time.Sleep(time.Second * 5)
func defaultErrorHandler(ctx Context, e error) bool {
time.Sleep(time.Second)
ctx.Debugf("default error: %v", e)
return true
}
3 changes: 1 addition & 2 deletions sync_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package postq

import (
"fmt"
"log"
)

// SyncEventHandlerFunc processes a single event and ONLY makes db changes.
Expand Down Expand Up @@ -38,7 +37,7 @@ func (t *SyncEventConsumer) Handle(ctx Context) (int, error) {
event.SetError(err.Error())
const query = `UPDATE event_queue SET error=$1, attempts=$2, last_attempt=NOW() WHERE id=$3`
if _, err := ctx.Pool().Exec(ctx, query, event.Error, event.Attempts, event.ID); err != nil {
log.Printf("error saving event attempt updates to event_queue: %v\n", err)
ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err)
}
}

Expand Down

0 comments on commit 33b24a2

Please sign in to comment.