Skip to content

Commit

Permalink
Merge pull request #1424 from openmeterio/fix-notification-api
Browse files Browse the repository at this point in the history
fix: notification api
  • Loading branch information
chrisgacsal authored Aug 23, 2024
2 parents 7f5fb01 + ba7da2a commit 52cd526
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 24 deletions.
4 changes: 3 additions & 1 deletion internal/notification/httpdriver/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/openmeterio/openmeter/pkg/framework/transport/httptransport"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/pagination"
"github.com/openmeterio/openmeter/pkg/sortx"
)

type (
Expand All @@ -32,7 +33,8 @@ func (h *handler) ListEvents() ListEventsHandler {

req := ListEventsRequest{
Namespaces: []string{ns},
OrderBy: defaultx.WithDefault(params.OrderBy, notification.EventOrderByID),
Order: sortx.Order(defaultx.WithDefault(params.Order, api.ListNotificationEventsParamsOrderSortOrderDESC)),
OrderBy: defaultx.WithDefault(params.OrderBy, notification.EventOrderByCreatedAt),
Page: pagination.Page{
PageSize: defaultx.WithDefault(params.PageSize, notification.DefaultPageSize),
PageNumber: defaultx.WithDefault(params.Page, notification.DefaultPageNumber),
Expand Down
6 changes: 3 additions & 3 deletions internal/notification/repository/entitymapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func EventFromDBEntity(e db.NotificationEvent) (*notification.Event, error) {
},
ID: e.ID,
Type: e.Type,
CreatedAt: e.CreatedAt,
CreatedAt: e.CreatedAt.UTC(),
Payload: payload,
Rule: *rule,
DeliveryStatus: statuses,
Expand All @@ -122,7 +122,7 @@ func EventDeliveryStatusFromDBEntity(e db.NotificationEventDeliveryStatus) *noti
EventID: e.EventID,
State: e.State,
Reason: e.Reason,
CreatedAt: e.CreatedAt,
UpdatedAt: e.UpdatedAt,
CreatedAt: e.CreatedAt.UTC(),
UpdatedAt: e.UpdatedAt.UTC(),
}
}
8 changes: 4 additions & 4 deletions internal/notification/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,18 +532,18 @@ func (r repository) ListEvents(ctx context.Context, params notification.ListEven
query.WithChannels()
})

order := entutils.GetOrdering(sortx.OrderDefault)
order := entutils.GetOrdering(sortx.OrderDesc)
if !params.Order.IsDefaultValue() {
order = entutils.GetOrdering(params.Order)
}

switch params.OrderBy {
case notification.EventOrderByCreatedAt:
query = query.Order(eventdb.ByCreatedAt(order...))
case notification.EventOrderByID:
query = query.Order(eventdb.ByID(order...))
case notification.EventOrderByCreatedAt:
fallthrough
default:
query = query.Order(eventdb.ByID(order...))
query = query.Order(eventdb.ByCreatedAt(order...))
}

response := pagination.PagedResponse[notification.Event]{
Expand Down
57 changes: 57 additions & 0 deletions internal/notification/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,23 @@ func (c service) CreateChannel(ctx context.Context, params CreateChannelInput) (
return nil, fmt.Errorf("invalid params: %w", err)
}

logger := c.logger.WithGroup("channel").With(
"operation", "create",
"namespace", params.Namespace,
)

logger.Debug("creating channel", "type", params.Type)

txFunc := func(ctx context.Context, repo TxRepository) (*Channel, error) {
channel, err := repo.CreateChannel(ctx, params)
if err != nil {
return nil, fmt.Errorf("failed to create channel: %w", err)
}

logger = logger.With("id", channel.ID)

logger.Debug("channel stored in repository")

switch params.Type {
case ChannelTypeWebhook:
var headers map[string]string
Expand All @@ -177,6 +188,8 @@ func (c service) CreateChannel(ctx context.Context, params CreateChannelInput) (
return nil, fmt.Errorf("failed to create webhook for channel: %w", err)
}

logger.Debug("webhook is created")

updateIn := UpdateChannelInput{
NamespacedModel: channel.NamespacedModel,
Type: channel.Type,
Expand All @@ -191,6 +204,7 @@ func (c service) CreateChannel(ctx context.Context, params CreateChannelInput) (
if err != nil {
return nil, fmt.Errorf("failed to update channel: %w", err)
}
logger.Debug("channel is updated in database with webhook configuration")
default:
return nil, fmt.Errorf("invalid channel type: %s", channel.Type)
}
Expand All @@ -206,6 +220,14 @@ func (c service) DeleteChannel(ctx context.Context, params DeleteChannelInput) e
return fmt.Errorf("invalid params: %w", err)
}

logger := c.logger.WithGroup("channel").With(
"operation", "delete",
"id", params.ID,
"namespace", params.Namespace,
)

logger.Debug("deleting channel")

rules, err := c.repo.ListRules(ctx, ListRulesInput{
Namespaces: []string{params.Namespace},
IncludeDisabled: true,
Expand Down Expand Up @@ -235,6 +257,8 @@ func (c service) DeleteChannel(ctx context.Context, params DeleteChannelInput) e
return fmt.Errorf("failed to delete webhook: %w", err)
}

logger.Debug("webhook associated with channel deleted")

return repo.DeleteChannel(ctx, params)
}

Expand All @@ -254,6 +278,14 @@ func (c service) UpdateChannel(ctx context.Context, params UpdateChannelInput) (
return nil, fmt.Errorf("invalid params: %w", err)
}

logger := c.logger.WithGroup("channel").With(
"operation", "update",
"id", params.ID,
"namespace", params.Namespace,
)

logger.Debug("updating channel")

channel, err := c.repo.GetChannel(ctx, GetChannelInput{
ID: params.ID,
Namespace: params.Namespace,
Expand All @@ -274,6 +306,8 @@ func (c service) UpdateChannel(ctx context.Context, params UpdateChannelInput) (
return nil, fmt.Errorf("failed to create channel: %w", err)
}

logger.Debug("channel updated in repository")

switch params.Type {
case ChannelTypeWebhook:
var headers map[string]string
Expand All @@ -289,10 +323,17 @@ func (c service) UpdateChannel(ctx context.Context, params UpdateChannelInput) (
CustomHeaders: headers,
Disabled: channel.Disabled,
Secret: &channel.Config.WebHook.SigningSecret,
Metadata: map[string]string{
ChannelIDMetadataKey: channel.ID,
},
Description: convert.ToPointer("Notification Channel: " + channel.ID),
})
if err != nil {
return nil, fmt.Errorf("failed to update webhook for channel: %w", err)
}

logger.Debug("webhook is updated")

default:
return nil, fmt.Errorf("invalid channel type: %s", channel.Type)
}
Expand All @@ -316,6 +357,13 @@ func (c service) CreateRule(ctx context.Context, params CreateRuleInput) (*Rule,
return nil, fmt.Errorf("invalid params: %w", err)
}

logger := c.logger.WithGroup("rule").With(
"operation", "create",
"namespace", params.Namespace,
)

logger.Debug("creating rule", "type", params.Type)

txFunc := func(ctx context.Context, repo TxRepository) (*Rule, error) {
rule, err := repo.CreateRule(ctx, params)
if err != nil {
Expand Down Expand Up @@ -400,6 +448,8 @@ func (c service) UpdateRule(ctx context.Context, params UpdateRuleInput) (*Rule,
"namespace", params.Namespace,
)

logger.Debug("updating rule")

rule, err := c.repo.GetRule(ctx, GetRuleInput{
ID: params.ID,
Namespace: params.Namespace,
Expand Down Expand Up @@ -507,6 +557,13 @@ func (c service) CreateEvent(ctx context.Context, params CreateEventInput) (*Eve
return nil, fmt.Errorf("invalid params: %w", err)
}

logger := c.logger.WithGroup("event").With(
"operation", "create",
"namespace", params.Namespace,
)

logger.Debug("creating event")

rule, err := c.repo.GetRule(ctx, GetRuleInput{
Namespace: params.Namespace,
ID: params.RuleID,
Expand Down
62 changes: 46 additions & 16 deletions internal/notification/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,22 @@ type CreateWebhookInput struct {

func (i CreateWebhookInput) Validate() error {
if i.Namespace == "" {
return errors.New("namespace is required")
return ValidationError{
Err: errors.New("namespace is required"),
}
}

if i.URL == "" {
return errors.New("url is required")
return ValidationError{
Err: errors.New("url is required"),
}
}

if i.Secret != nil {
if i.Secret != nil && *i.Secret != "" {
if err := ValidateSigningSecret(*i.Secret); err != nil {
return fmt.Errorf("invalid secret: %w", err)
return ValidationError{
Err: fmt.Errorf("invalid secret: %w", err),
}
}
}

Expand All @@ -105,23 +111,33 @@ type UpdateWebhookInput struct {

func (i UpdateWebhookInput) Validate() error {
if i.Namespace == "" {
return errors.New("namespace is required")
return ValidationError{
Err: errors.New("namespace is required"),
}
}

if i.ID == "" {
return errors.New("id is required")
return ValidationError{
Err: errors.New("id is required"),
}
}

if i.URL == "" {
return errors.New("url is required")
return ValidationError{
Err: errors.New("url is required"),
}
}

if i.Secret == nil {
return errors.New("secret is required")
return ValidationError{
Err: errors.New("secret is required"),
}
} else {
secret, _ := strings.CutPrefix(*i.Secret, SigningSecretPrefix)
if _, err := base64.StdEncoding.DecodeString(secret); err != nil {
return errors.New("invalid secret: must be base64 encoded")
return ValidationError{
Err: errors.New("invalid secret: must be base64 encoded"),
}
}
}

Expand All @@ -140,11 +156,15 @@ type UpdateWebhookChannelsInput struct {

func (i UpdateWebhookChannelsInput) Validate() error {
if i.Namespace == "" {
return errors.New("namespace is required")
return ValidationError{
Err: errors.New("namespace is required"),
}
}

if i.ID == "" {
return errors.New("id is required")
return ValidationError{
Err: errors.New("id is required"),
}
}

return nil
Expand All @@ -160,11 +180,15 @@ type GetWebhookInput struct {

func (i GetWebhookInput) Validate() error {
if i.Namespace == "" {
return errors.New("namespace is required")
return ValidationError{
Err: errors.New("namespace is required"),
}
}

if i.ID == "" {
return errors.New("id is required")
return ValidationError{
Err: errors.New("id is required"),
}
}

return nil
Expand Down Expand Up @@ -195,15 +219,21 @@ type SendMessageInput struct {

func (i SendMessageInput) Validate() error {
if i.Namespace == "" {
return errors.New("namespace is required")
return ValidationError{
Err: errors.New("namespace is required"),
}
}

if i.EventType == "" {
return errors.New("event type is required")
return ValidationError{
Err: errors.New("event type is required"),
}
}

if len(i.Payload) == 0 {
return errors.New("payload must not be empty")
return ValidationError{
Err: errors.New("payload must not be empty"),
}
}

return nil
Expand Down

0 comments on commit 52cd526

Please sign in to comment.