Skip to content

Commit

Permalink
Merge pull request #1445 from openmeterio/refactor/report-all-calcula…
Browse files Browse the repository at this point in the history
…tions

refactor: balance worker metrics
  • Loading branch information
turip authored Aug 29, 2024
2 parents 14bba4b + 93758ae commit 0485818
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 28 deletions.
9 changes: 9 additions & 0 deletions openmeter/entitlement/balanceworker/entitlementhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/openmeter/entitlement"
entitlementdriver "github.com/openmeterio/openmeter/openmeter/entitlement/driver"
"github.com/openmeterio/openmeter/openmeter/entitlement/snapshot"
Expand Down Expand Up @@ -83,11 +86,17 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementEntity *ent
return nil, fmt.Errorf("failed to get feature: %w", err)
}

calculationStart := time.Now()

value, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementEntity.Namespace, entitlementEntity.SubjectKey, entitlementEntity.ID, calculatedAt)
if err != nil {
return nil, fmt.Errorf("failed to get entitlement value: %w", err)
}

w.metricRecalculationTime.Record(ctx, time.Since(calculationStart).Milliseconds(), metric.WithAttributes(
attribute.String(metricAttributeKeyEntitltementType, string(entitlementEntity.EntitlementType)),
))

mappedValues, err := entitlementdriver.MapEntitlementValueToAPI(value)
if err != nil {
return nil, fmt.Errorf("failed to map entitlement value: %w", err)
Expand Down
69 changes: 41 additions & 28 deletions openmeter/entitlement/balanceworker/recalculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@ const (

defaultLRUCacheSize = 10_000

metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms"
)
metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms"
metricNameRecalculationJobCalculationTime = "balance_worker.entitlement_recalculation_job_calculation_time_ms"

var (
recalculationTimeUpdateAttribute = attribute.String("operation", "update")
recalculationTimeDeleteAttribute = attribute.String("operation", "delete")
metricAttributeKeyEntitltementType = "entitlement_type"
)

type RecalculatorOptions struct {
Expand Down Expand Up @@ -66,7 +64,8 @@ type Recalculator struct {
featureCache *lru.Cache[string, productcatalog.Feature]
subjectCache *lru.Cache[string, models.Subject]

metricRecalculationTime metric.Int64Histogram
metricRecalculationTime metric.Int64Histogram
metricRecalculationJobRecalculationTime metric.Int64Histogram
}

func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
Expand All @@ -92,11 +91,20 @@ func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err)
}

metricRecalculationJobRecalculationTime, err := opts.MetricMeter.Int64Histogram(
metricNameRecalculationJobCalculationTime,
metric.WithDescription("Time takes to recalculate the entitlements including the necessary data fetches"),
)
if err != nil {
return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err)
}

return &Recalculator{
opts: opts,
featureCache: featureCache,
subjectCache: subjectCache,
metricRecalculationTime: metricRecalculationTime,
opts: opts,
featureCache: featureCache,
subjectCache: subjectCache,
metricRecalculationTime: metricRecalculationTime,
metricRecalculationJobRecalculationTime: metricRecalculationJobRecalculationTime,
}, nil
}

Expand All @@ -123,30 +131,29 @@ func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []e
var errs error
for _, ent := range entitlements {
start := time.Now()
if ent.DeletedAt != nil {
err := r.sendEntitlementDeletedEvent(ctx, ent)
if err != nil {
errs = errors.Join(errs, err)
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Milliseconds(),
metric.WithAttributes(recalculationTimeDeleteAttribute))
} else {
err := r.sendEntitlementUpdatedEvent(ctx, ent)
if err != nil {
errs = errors.Join(errs, err)
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Milliseconds(),
metric.WithAttributes(recalculationTimeUpdateAttribute))

if err := r.sendEntitlementEvent(ctx, ent); err != nil {
errs = errors.Join(errs, fmt.Errorf("error sending event for entitlement [id=%s]: %w", ent.ID, err))
}

r.metricRecalculationJobRecalculationTime.Record(ctx,
time.Since(start).Milliseconds(),
metric.WithAttributes(
attribute.String(metricAttributeKeyEntitltementType, string(ent.EntitlementType)),
))
}

return errs
}

func (r *Recalculator) sendEntitlementEvent(ctx context.Context, ent entitlement.Entitlement) error {
if ent.DeletedAt != nil {
return r.sendEntitlementDeletedEvent(ctx, ent)
}

return r.sendEntitlementUpdatedEvent(ctx, ent)
}

func (r *Recalculator) sendEntitlementDeletedEvent(ctx context.Context, ent entitlement.Entitlement) error {
subject, err := r.getSubjectByKey(ctx, ent.Namespace, ent.SubjectKey)
if err != nil {
Expand Down Expand Up @@ -196,6 +203,12 @@ func (r *Recalculator) sendEntitlementUpdatedEvent(ctx context.Context, ent enti
return fmt.Errorf("failed to get entitlement value: %w", err)
}

r.metricRecalculationTime.Record(ctx,
time.Since(calculatedAt).Milliseconds(),
metric.WithAttributes(
attribute.String(metricAttributeKeyEntitltementType, string(ent.EntitlementType)),
))

mappedValues, err := entitlementdriver.MapEntitlementValueToAPI(value)
if err != nil {
return fmt.Errorf("failed to map entitlement value: %w", err)
Expand Down
12 changes: 12 additions & 0 deletions openmeter/entitlement/balanceworker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Worker struct {
router *message.Router

highWatermarkCache *lru.Cache[string, highWatermarkCacheEntry]

metricRecalculationTime metric.Int64Histogram
}

func New(opts WorkerOptions) (*Worker, error) {
Expand All @@ -72,11 +74,21 @@ func New(opts WorkerOptions) (*Worker, error) {
return nil, fmt.Errorf("failed to create high watermark cache: %w", err)
}

metricRecalculationTime, err := opts.Router.MetricMeter.Int64Histogram(
metricNameRecalculationTime,
metric.WithDescription("Entitlement recalculation time"),
)
if err != nil {
return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err)
}

worker := &Worker{
opts: opts,
entitlement: opts.Entitlement,
repo: opts.Repo,
highWatermarkCache: highWatermarkCache,

metricRecalculationTime: metricRecalculationTime,
}

router, err := router.NewDefaultRouter(opts.Router)
Expand Down

0 comments on commit 0485818

Please sign in to comment.