From a5519bdc8db1b9b45c306da79e2d14802253b8ae Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Fri, 23 Aug 2024 15:12:23 +0200 Subject: [PATCH 1/5] fix: missing metadata on channel udpate --- internal/notification/service.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/notification/service.go b/internal/notification/service.go index ae2605356..df101ffcd 100644 --- a/internal/notification/service.go +++ b/internal/notification/service.go @@ -289,6 +289,10 @@ func (c service) UpdateChannel(ctx context.Context, params UpdateChannelInput) ( CustomHeaders: headers, Disabled: channel.Disabled, Secret: &channel.Config.WebHook.SigningSecret, + Metadata: map[string]string{ + ChannelIDMetadataKey: channel.ID, + }, + Description: convert.ToPointer("Notification Channel: " + channel.ID), }) if err != nil { return nil, fmt.Errorf("failed to update webhook for channel: %w", err) From 367dc2fcc5a05e88bf5f675ef3d9faac75f5c840 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Fri, 23 Aug 2024 15:32:43 +0200 Subject: [PATCH 2/5] fix: wrap validation errors --- internal/notification/webhook/webhook.go | 62 ++++++++++++++++++------ 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/internal/notification/webhook/webhook.go b/internal/notification/webhook/webhook.go index dccc7556e..210e2f703 100644 --- a/internal/notification/webhook/webhook.go +++ b/internal/notification/webhook/webhook.go @@ -70,16 +70,22 @@ type CreateWebhookInput struct { func (i CreateWebhookInput) Validate() error { if i.Namespace == "" { - return errors.New("namespace is required") + return ValidationError{ + Err: errors.New("namespace is required"), + } } if i.URL == "" { - return errors.New("url is required") + return ValidationError{ + Err: errors.New("url is required"), + } } - if i.Secret != nil { + if i.Secret != nil && *i.Secret != "" { if err := ValidateSigningSecret(*i.Secret); err != nil { - return fmt.Errorf("invalid secret: %w", err) + return ValidationError{ + Err: fmt.Errorf("invalid secret: %w", err), + } } } @@ -105,23 +111,33 @@ type UpdateWebhookInput struct { func (i UpdateWebhookInput) Validate() error { if i.Namespace == "" { - return errors.New("namespace is required") + return ValidationError{ + Err: errors.New("namespace is required"), + } } if i.ID == "" { - return errors.New("id is required") + return ValidationError{ + Err: errors.New("id is required"), + } } if i.URL == "" { - return errors.New("url is required") + return ValidationError{ + Err: errors.New("url is required"), + } } if i.Secret == nil { - return errors.New("secret is required") + return ValidationError{ + Err: errors.New("secret is required"), + } } else { secret, _ := strings.CutPrefix(*i.Secret, SigningSecretPrefix) if _, err := base64.StdEncoding.DecodeString(secret); err != nil { - return errors.New("invalid secret: must be base64 encoded") + return ValidationError{ + Err: errors.New("invalid secret: must be base64 encoded"), + } } } @@ -140,11 +156,15 @@ type UpdateWebhookChannelsInput struct { func (i UpdateWebhookChannelsInput) Validate() error { if i.Namespace == "" { - return errors.New("namespace is required") + return ValidationError{ + Err: errors.New("namespace is required"), + } } if i.ID == "" { - return errors.New("id is required") + return ValidationError{ + Err: errors.New("id is required"), + } } return nil @@ -160,11 +180,15 @@ type GetWebhookInput struct { func (i GetWebhookInput) Validate() error { if i.Namespace == "" { - return errors.New("namespace is required") + return ValidationError{ + Err: errors.New("namespace is required"), + } } if i.ID == "" { - return errors.New("id is required") + return ValidationError{ + Err: errors.New("id is required"), + } } return nil @@ -195,15 +219,21 @@ type SendMessageInput struct { func (i SendMessageInput) Validate() error { if i.Namespace == "" { - return errors.New("namespace is required") + return ValidationError{ + Err: errors.New("namespace is required"), + } } if i.EventType == "" { - return errors.New("event type is required") + return ValidationError{ + Err: errors.New("event type is required"), + } } if len(i.Payload) == 0 { - return errors.New("payload must not be empty") + return ValidationError{ + Err: errors.New("payload must not be empty"), + } } return nil From e7dc315feba87ecfb38d5c6537081860c9193599 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Fri, 23 Aug 2024 15:51:57 +0200 Subject: [PATCH 3/5] feat: add debug logging to notification service --- internal/notification/service.go | 53 ++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/internal/notification/service.go b/internal/notification/service.go index df101ffcd..c08324a3f 100644 --- a/internal/notification/service.go +++ b/internal/notification/service.go @@ -146,12 +146,23 @@ func (c service) CreateChannel(ctx context.Context, params CreateChannelInput) ( return nil, fmt.Errorf("invalid params: %w", err) } + logger := c.logger.WithGroup("channel").With( + "operation", "create", + "namespace", params.Namespace, + ) + + logger.Debug("creating channel", "type", params.Type) + txFunc := func(ctx context.Context, repo TxRepository) (*Channel, error) { channel, err := repo.CreateChannel(ctx, params) if err != nil { return nil, fmt.Errorf("failed to create channel: %w", err) } + logger = logger.With("id", channel.ID) + + logger.Debug("channel stored in repository") + switch params.Type { case ChannelTypeWebhook: var headers map[string]string @@ -177,6 +188,8 @@ func (c service) CreateChannel(ctx context.Context, params CreateChannelInput) ( return nil, fmt.Errorf("failed to create webhook for channel: %w", err) } + logger.Debug("webhook is created") + updateIn := UpdateChannelInput{ NamespacedModel: channel.NamespacedModel, Type: channel.Type, @@ -191,6 +204,7 @@ func (c service) CreateChannel(ctx context.Context, params CreateChannelInput) ( if err != nil { return nil, fmt.Errorf("failed to update channel: %w", err) } + logger.Debug("channel is updated in database with webhook configuration") default: return nil, fmt.Errorf("invalid channel type: %s", channel.Type) } @@ -206,6 +220,14 @@ func (c service) DeleteChannel(ctx context.Context, params DeleteChannelInput) e return fmt.Errorf("invalid params: %w", err) } + logger := c.logger.WithGroup("channel").With( + "operation", "delete", + "id", params.ID, + "namespace", params.Namespace, + ) + + logger.Debug("deleting channel") + rules, err := c.repo.ListRules(ctx, ListRulesInput{ Namespaces: []string{params.Namespace}, IncludeDisabled: true, @@ -235,6 +257,8 @@ func (c service) DeleteChannel(ctx context.Context, params DeleteChannelInput) e return fmt.Errorf("failed to delete webhook: %w", err) } + logger.Debug("webhook associated with channel deleted") + return repo.DeleteChannel(ctx, params) } @@ -254,6 +278,14 @@ func (c service) UpdateChannel(ctx context.Context, params UpdateChannelInput) ( return nil, fmt.Errorf("invalid params: %w", err) } + logger := c.logger.WithGroup("channel").With( + "operation", "update", + "id", params.ID, + "namespace", params.Namespace, + ) + + logger.Debug("updating channel") + channel, err := c.repo.GetChannel(ctx, GetChannelInput{ ID: params.ID, Namespace: params.Namespace, @@ -274,6 +306,8 @@ func (c service) UpdateChannel(ctx context.Context, params UpdateChannelInput) ( return nil, fmt.Errorf("failed to create channel: %w", err) } + logger.Debug("channel updated in repository") + switch params.Type { case ChannelTypeWebhook: var headers map[string]string @@ -297,6 +331,9 @@ func (c service) UpdateChannel(ctx context.Context, params UpdateChannelInput) ( if err != nil { return nil, fmt.Errorf("failed to update webhook for channel: %w", err) } + + logger.Debug("webhook is updated") + default: return nil, fmt.Errorf("invalid channel type: %s", channel.Type) } @@ -320,6 +357,13 @@ func (c service) CreateRule(ctx context.Context, params CreateRuleInput) (*Rule, return nil, fmt.Errorf("invalid params: %w", err) } + logger := c.logger.WithGroup("rule").With( + "operation", "create", + "namespace", params.Namespace, + ) + + logger.Debug("creating rule", "type", params.Type) + txFunc := func(ctx context.Context, repo TxRepository) (*Rule, error) { rule, err := repo.CreateRule(ctx, params) if err != nil { @@ -404,6 +448,8 @@ func (c service) UpdateRule(ctx context.Context, params UpdateRuleInput) (*Rule, "namespace", params.Namespace, ) + logger.Debug("updating rule") + rule, err := c.repo.GetRule(ctx, GetRuleInput{ ID: params.ID, Namespace: params.Namespace, @@ -511,6 +557,13 @@ func (c service) CreateEvent(ctx context.Context, params CreateEventInput) (*Eve return nil, fmt.Errorf("invalid params: %w", err) } + logger := c.logger.WithGroup("event").With( + "operation", "create", + "namespace", params.Namespace, + ) + + logger.Debug("creating event") + rule, err := c.repo.GetRule(ctx, GetRuleInput{ Namespace: params.Namespace, ID: params.RuleID, From d76d9143a6d82f9318edbcbfc19a4243ab985f94 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Fri, 23 Aug 2024 16:18:30 +0200 Subject: [PATCH 4/5] fix: return event timestamps in UTC --- internal/notification/repository/entitymapping.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/notification/repository/entitymapping.go b/internal/notification/repository/entitymapping.go index af38a674d..29fb7c844 100644 --- a/internal/notification/repository/entitymapping.go +++ b/internal/notification/repository/entitymapping.go @@ -104,7 +104,7 @@ func EventFromDBEntity(e db.NotificationEvent) (*notification.Event, error) { }, ID: e.ID, Type: e.Type, - CreatedAt: e.CreatedAt, + CreatedAt: e.CreatedAt.UTC(), Payload: payload, Rule: *rule, DeliveryStatus: statuses, @@ -122,7 +122,7 @@ func EventDeliveryStatusFromDBEntity(e db.NotificationEventDeliveryStatus) *noti EventID: e.EventID, State: e.State, Reason: e.Reason, - CreatedAt: e.CreatedAt, - UpdatedAt: e.UpdatedAt, + CreatedAt: e.CreatedAt.UTC(), + UpdatedAt: e.UpdatedAt.UTC(), } } From ba7da2a84c06b59cb99144c662a93e062f69d5ce Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Fri, 23 Aug 2024 16:19:11 +0200 Subject: [PATCH 5/5] fix: notification event ordering --- internal/notification/httpdriver/event.go | 4 +++- internal/notification/repository/repository.go | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/notification/httpdriver/event.go b/internal/notification/httpdriver/event.go index cfbdd6e04..fc7273a9c 100644 --- a/internal/notification/httpdriver/event.go +++ b/internal/notification/httpdriver/event.go @@ -13,6 +13,7 @@ import ( "github.com/openmeterio/openmeter/pkg/framework/transport/httptransport" "github.com/openmeterio/openmeter/pkg/models" "github.com/openmeterio/openmeter/pkg/pagination" + "github.com/openmeterio/openmeter/pkg/sortx" ) type ( @@ -32,7 +33,8 @@ func (h *handler) ListEvents() ListEventsHandler { req := ListEventsRequest{ Namespaces: []string{ns}, - OrderBy: defaultx.WithDefault(params.OrderBy, notification.EventOrderByID), + Order: sortx.Order(defaultx.WithDefault(params.Order, api.ListNotificationEventsParamsOrderSortOrderDESC)), + OrderBy: defaultx.WithDefault(params.OrderBy, notification.EventOrderByCreatedAt), Page: pagination.Page{ PageSize: defaultx.WithDefault(params.PageSize, notification.DefaultPageSize), PageNumber: defaultx.WithDefault(params.Page, notification.DefaultPageNumber), diff --git a/internal/notification/repository/repository.go b/internal/notification/repository/repository.go index b25e646e5..343fa7c9b 100644 --- a/internal/notification/repository/repository.go +++ b/internal/notification/repository/repository.go @@ -532,18 +532,18 @@ func (r repository) ListEvents(ctx context.Context, params notification.ListEven query.WithChannels() }) - order := entutils.GetOrdering(sortx.OrderDefault) + order := entutils.GetOrdering(sortx.OrderDesc) if !params.Order.IsDefaultValue() { order = entutils.GetOrdering(params.Order) } switch params.OrderBy { - case notification.EventOrderByCreatedAt: - query = query.Order(eventdb.ByCreatedAt(order...)) case notification.EventOrderByID: + query = query.Order(eventdb.ByID(order...)) + case notification.EventOrderByCreatedAt: fallthrough default: - query = query.Order(eventdb.ByID(order...)) + query = query.Order(eventdb.ByCreatedAt(order...)) } response := pagination.PagedResponse[notification.Event]{