Skip to content

Commit

Permalink
Merge pull request #1369 from openmeterio/refactor/add-proper-dlq
Browse files Browse the repository at this point in the history
feat: simplify dlq setup
  • Loading branch information
turip authored Aug 15, 2024
2 parents fbf29c2 + 8831608 commit 9793b8a
Show file tree
Hide file tree
Showing 14 changed files with 453 additions and 175 deletions.
3 changes: 2 additions & 1 deletion cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func main() {
Publisher: eventPublisherDriver,
Logger: logger,

DLQ: conf.BalanceWorker.DLQ,
Config: conf.BalanceWorker.ConsumerConfiguration,
},

EventBus: eventPublisher,
Expand Down Expand Up @@ -341,6 +341,7 @@ func initEventPublisherDriver(ctx context.Context, broker watermillkafka.BrokerO
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Topic: conf.BalanceWorker.DLQ.Topic,
NumPartitions: int32(conf.BalanceWorker.DLQ.AutoProvision.Partitions),
Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention,
})
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func main() {
Publisher: eventPublisherDriver,
Logger: logger,

DLQ: conf.Notification.Consumer.DLQ,
Config: conf.Notification.Consumer,
},
Marshaler: eventPublisher.Marshaler(),

Expand Down Expand Up @@ -334,6 +334,7 @@ func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf con
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Topic: conf.Notification.Consumer.DLQ.Topic,
NumPartitions: int32(conf.Notification.Consumer.DLQ.AutoProvision.Partitions),
Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention,
})
}

Expand Down
33 changes: 4 additions & 29 deletions config/balanceworker.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,23 @@
package config

import (
"errors"
"fmt"
"time"

"github.com/spf13/viper"
)

type BalanceWorkerConfiguration struct {
DLQ DLQConfiguration
Retry RetryConfiguration
ConsumerGroupName string
ConsumerConfiguration `mapstructure:",squash"`
}

func (c BalanceWorkerConfiguration) Validate() error {
if err := c.DLQ.Validate(); err != nil {
return fmt.Errorf("poision queue: %w", err)
}

if err := c.Retry.Validate(); err != nil {
return fmt.Errorf("retry: %w", err)
}

if c.ConsumerGroupName == "" {
return errors.New("consumer group name is required")
if err := c.ConsumerConfiguration.Validate(); err != nil {
return err
}

return nil
}

func ConfigureBalanceWorker(v *viper.Viper) {
v.SetDefault("balanceWorker.dlq.enabled", true)
ConfigureConsumer(v, "balanceWorker")
v.SetDefault("balanceWorker.dlq.topic", "om_sys.balance_worker_dlq")
v.SetDefault("balanceWorker.dlq.autoProvision.enabled", true)
v.SetDefault("balanceWorker.dlq.autoProvision.partitions", 1)

v.SetDefault("balanceWorker.dlq.throttle.enabled", true)
// Let's throttle poision queue to 10 messages per second
v.SetDefault("balanceWorker.dlq.throttle.count", 10)
v.SetDefault("balanceWorker.dlq.throttle.duration", time.Second)

v.SetDefault("balanceWorker.retry.maxRetries", 5)
v.SetDefault("balanceWorker.retry.initialInterval", 100*time.Millisecond)

v.SetDefault("balanceWorker.consumerGroupName", "om_balance_worker")
}
50 changes: 24 additions & 26 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,44 +197,42 @@ func TestComplete(t *testing.T) {
},
},
BalanceWorker: BalanceWorkerConfiguration{
DLQ: DLQConfiguration{
Enabled: true,
Topic: "om_sys.balance_worker_dlq",
AutoProvision: AutoProvisionConfiguration{
Enabled: true,
Partitions: 1,
ConsumerConfiguration: ConsumerConfiguration{
ProcessingTimeout: 30 * time.Second,
Retry: RetryConfiguration{
InitialInterval: 10 * time.Millisecond,
MaxInterval: time.Second,
MaxElapsedTime: time.Minute,
},
Throttle: ThrottleConfiguration{
Enabled: true,
Count: 10,
Duration: time.Second,
DLQ: DLQConfiguration{
Enabled: true,
Topic: "om_sys.balance_worker_dlq",
AutoProvision: DLQAutoProvisionConfiguration{
Enabled: true,
Partitions: 1,
Retention: 90 * 24 * time.Hour,
},
},
ConsumerGroupName: "om_balance_worker",
},
Retry: RetryConfiguration{
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
},
ConsumerGroupName: "om_balance_worker",
},
Notification: NotificationConfiguration{
Enabled: true,
Consumer: NotificationConsumerConfiguration{
Consumer: ConsumerConfiguration{
ProcessingTimeout: 30 * time.Second,
Retry: RetryConfiguration{
InitialInterval: 10 * time.Millisecond,
MaxInterval: time.Second,
MaxElapsedTime: time.Minute,
},
DLQ: DLQConfiguration{
Enabled: true,
Topic: "om_sys.notification_service_dlq",
AutoProvision: AutoProvisionConfiguration{
AutoProvision: DLQAutoProvisionConfiguration{
Enabled: true,
Partitions: 1,
Retention: 90 * 24 * time.Hour,
},
Throttle: ThrottleConfiguration{
Enabled: true,
Count: 10,
Duration: time.Second,
},
},
Retry: RetryConfiguration{
MaxRetries: 5,
InitialInterval: 100 * time.Millisecond,
},
ConsumerGroupName: "om_notification_service",
},
Expand Down
99 changes: 80 additions & 19 deletions config/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ func (c EventSubsystemConfiguration) Validate() error {
}

type AutoProvisionConfiguration struct {
Enabled bool
Partitions int
Enabled bool
Partitions int
DLQRetention time.Duration
}

func (c AutoProvisionConfiguration) Validate() error {
Expand All @@ -48,11 +49,44 @@ func (c AutoProvisionConfiguration) Validate() error {
return nil
}

type ConsumerConfiguration struct {
// ProcessingTimeout is the maximum time a message is allowed to be processed before it is considered failed. 0 disables the timeout.
ProcessingTimeout time.Duration

// Retry specifies how many times a message should be retried before it is sent to the DLQ.
Retry RetryConfiguration

// ConsumerGroupName is the name of the consumer group that the consumer belongs to.
ConsumerGroupName string

// DLQ specifies the configuration for the Dead Letter Queue.
DLQ DLQConfiguration
}

func (c ConsumerConfiguration) Validate() error {
if c.ProcessingTimeout < 0 {
return errors.New("processing timeout must be positive or 0")
}

if c.ConsumerGroupName == "" {
return errors.New("consumer group name is required")
}

if err := c.Retry.Validate(); err != nil {
return fmt.Errorf("retry configuration is invalid: %w", err)
}

if err := c.DLQ.Validate(); err != nil {
return fmt.Errorf("dlq configuration is invalid: %w", err)
}

return nil
}

type DLQConfiguration struct {
Enabled bool
Topic string
AutoProvision AutoProvisionConfiguration
Throttle ThrottleConfiguration
AutoProvision DLQAutoProvisionConfiguration
}

func (c DLQConfiguration) Validate() error {
Expand All @@ -64,52 +98,79 @@ func (c DLQConfiguration) Validate() error {
return errors.New("topic name is required")
}

if err := c.Throttle.Validate(); err != nil {
return fmt.Errorf("throttle: %w", err)
if err := c.AutoProvision.Validate(); err != nil {
return fmt.Errorf("auto provision configuration is invalid: %w", err)
}

return nil
}

type ThrottleConfiguration struct {
Enabled bool
Count int64
Duration time.Duration
type DLQAutoProvisionConfiguration struct {
Enabled bool
Partitions int
Retention time.Duration
}

func (c ThrottleConfiguration) Validate() error {
func (c DLQAutoProvisionConfiguration) Validate() error {
if !c.Enabled {
return nil
}

if c.Count <= 0 {
return errors.New("count must be greater than 0")
if c.Partitions < 1 {
return errors.New("partitions must be greater than 0")
}

if c.Duration <= 0 {
return errors.New("duration must be greater than 0")
if c.Retention <= 0 {
return errors.New("retention must be greater than 0")
}

return nil
}

type RetryConfiguration struct {
MaxRetries int
// MaxRetries is maximum number of times a retry will be attempted. Disabled if 0
MaxRetries int
// InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier.
InitialInterval time.Duration
// MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval.
MaxInterval time.Duration
// MaxElapsedTime sets the time limit of how long retries will be attempted. Disabled if 0.
MaxElapsedTime time.Duration
}

func (c RetryConfiguration) Validate() error {
if c.MaxRetries <= 0 {
return errors.New("max retries must be greater than 0")
if c.MaxRetries < 0 {
return errors.New("max retries must be positive or 0")
}

if c.MaxElapsedTime < 0 {
return errors.New("max elapsed time must be positive or 0")
}

if c.InitialInterval <= 0 {
return errors.New("initial interval must be greater than 0")
}

if c.MaxInterval <= 0 {
return errors.New("max interval must be greater than 0")
}

return nil
}

func ConfigureConsumer(v *viper.Viper, prefix string) {
v.SetDefault(prefix+".processingTimeout", 30*time.Second)

v.SetDefault(prefix+".retry.maxRetries", 0)
v.SetDefault(prefix+".retry.initialInterval", 10*time.Millisecond)
v.SetDefault(prefix+".retry.maxInterval", time.Second)
v.SetDefault(prefix+".retry.maxElapsedTime", time.Minute)

v.SetDefault(prefix+".dlq.enabled", true)
v.SetDefault(prefix+".dlq.autoProvision.enabled", true)
v.SetDefault(prefix+".dlq.autoProvision.partitions", 1)
v.SetDefault(prefix+".dlq.autoProvision.retention", 90*24*time.Hour)
}

func ConfigureEvents(v *viper.Viper) {
// TODO: after the system events are fully implemented, we should enable them by default
v.SetDefault("events.enabled", false)
Expand Down
38 changes: 2 additions & 36 deletions config/notification.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package config

import (
"errors"
"fmt"
"time"

"github.com/spf13/viper"
)

type NotificationConfiguration struct {
Enabled bool
Consumer NotificationConsumerConfiguration
Consumer ConsumerConfiguration
}

func (c NotificationConfiguration) Validate() error {
Expand All @@ -20,40 +18,8 @@ func (c NotificationConfiguration) Validate() error {
return nil
}

type NotificationConsumerConfiguration struct {
DLQ DLQConfiguration
Retry RetryConfiguration
ConsumerGroupName string
}

func (c NotificationConsumerConfiguration) Validate() error {
if err := c.DLQ.Validate(); err != nil {
return fmt.Errorf("poision queue: %w", err)
}

if err := c.Retry.Validate(); err != nil {
return fmt.Errorf("retry: %w", err)
}

if c.ConsumerGroupName == "" {
return errors.New("consumer group name is required")
}
return nil
}

func ConfigureNotification(v *viper.Viper) {
v.SetDefault("notification.consumer.dlq.enabled", true)
ConfigureConsumer(v, "notification.consumer")
v.SetDefault("notification.consumer.dlq.topic", "om_sys.notification_service_dlq")
v.SetDefault("notification.consumer.dlq.autoProvision.enabled", true)
v.SetDefault("notification.consumer.dlq.autoProvision.partitions", 1)

v.SetDefault("notification.consumer.dlq.throttle.enabled", true)
// Let's throttle poison queue to 10 messages per second
v.SetDefault("notification.consumer.dlq.throttle.count", 10)
v.SetDefault("notification.consumer.dlq.throttle.duration", time.Second)

v.SetDefault("notification.consumer.retry.maxRetries", 5)
v.SetDefault("notification.consumer.retry.initialInterval", 100*time.Millisecond)

v.SetDefault("notification.consumer.consumerGroupName", "om_notification_service")
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tetratelabs/wazero v1.6.0 // indirect
github.com/tilinna/z85 v1.0.0 // indirect
Expand Down
5 changes: 2 additions & 3 deletions internal/entitlement/balanceworker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,14 @@ func New(opts WorkerOptions) (*Worker, error) {
highWatermarkCache: highWatermarkCache,
}

eventHandler := worker.eventHandler()

router, err := router.NewDefaultRouter(opts.Router, eventHandler)
router, err := router.NewDefaultRouter(opts.Router)
if err != nil {
return nil, err
}

worker.router = router

eventHandler := worker.eventHandler()
router.AddNoPublisherHandler(
"balance_worker_system_events",
opts.SystemEventsTopic,
Expand Down
Loading

0 comments on commit 9793b8a

Please sign in to comment.