From 93758ae1ef532a47be7b1f0972bc0039ebfd9bc8 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 29 Aug 2024 13:33:15 +0200 Subject: [PATCH] refactor: report all calculations This patch ensures that we are reporting all calculations performed by the balance worker not just the ones done by the recalculator. We also expose an end-to-end metric from the jobs, so that we can have a measured upper bound on the job execution times. --- .../balanceworker/entitlementhandler.go | 9 +++ .../entitlement/balanceworker/recalculate.go | 69 +++++++++++-------- openmeter/entitlement/balanceworker/worker.go | 12 ++++ 3 files changed, 62 insertions(+), 28 deletions(-) diff --git a/openmeter/entitlement/balanceworker/entitlementhandler.go b/openmeter/entitlement/balanceworker/entitlementhandler.go index 0a9be7f86..02c6dc0ec 100644 --- a/openmeter/entitlement/balanceworker/entitlementhandler.go +++ b/openmeter/entitlement/balanceworker/entitlementhandler.go @@ -5,6 +5,9 @@ import ( "fmt" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "github.com/openmeterio/openmeter/openmeter/entitlement" entitlementdriver "github.com/openmeterio/openmeter/openmeter/entitlement/driver" "github.com/openmeterio/openmeter/openmeter/entitlement/snapshot" @@ -83,11 +86,17 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementEntity *ent return nil, fmt.Errorf("failed to get feature: %w", err) } + calculationStart := time.Now() + value, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementEntity.Namespace, entitlementEntity.SubjectKey, entitlementEntity.ID, calculatedAt) if err != nil { return nil, fmt.Errorf("failed to get entitlement value: %w", err) } + w.metricRecalculationTime.Record(ctx, time.Since(calculationStart).Milliseconds(), metric.WithAttributes( + attribute.String(metricAttributeKeyEntitltementType, string(entitlementEntity.EntitlementType)), + )) + mappedValues, err := entitlementdriver.MapEntitlementValueToAPI(value) if err != nil { return nil, fmt.Errorf("failed to map entitlement value: %w", err) diff --git a/openmeter/entitlement/balanceworker/recalculate.go b/openmeter/entitlement/balanceworker/recalculate.go index 1fe43eb53..c0add659b 100644 --- a/openmeter/entitlement/balanceworker/recalculate.go +++ b/openmeter/entitlement/balanceworker/recalculate.go @@ -29,12 +29,10 @@ const ( defaultLRUCacheSize = 10_000 - metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms" -) + metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms" + metricNameRecalculationJobCalculationTime = "balance_worker.entitlement_recalculation_job_calculation_time_ms" -var ( - recalculationTimeUpdateAttribute = attribute.String("operation", "update") - recalculationTimeDeleteAttribute = attribute.String("operation", "delete") + metricAttributeKeyEntitltementType = "entitlement_type" ) type RecalculatorOptions struct { @@ -66,7 +64,8 @@ type Recalculator struct { featureCache *lru.Cache[string, productcatalog.Feature] subjectCache *lru.Cache[string, models.Subject] - metricRecalculationTime metric.Int64Histogram + metricRecalculationTime metric.Int64Histogram + metricRecalculationJobRecalculationTime metric.Int64Histogram } func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) { @@ -92,11 +91,20 @@ func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) { return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err) } + metricRecalculationJobRecalculationTime, err := opts.MetricMeter.Int64Histogram( + metricNameRecalculationJobCalculationTime, + metric.WithDescription("Time takes to recalculate the entitlements including the necessary data fetches"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err) + } + return &Recalculator{ - opts: opts, - featureCache: featureCache, - subjectCache: subjectCache, - metricRecalculationTime: metricRecalculationTime, + opts: opts, + featureCache: featureCache, + subjectCache: subjectCache, + metricRecalculationTime: metricRecalculationTime, + metricRecalculationJobRecalculationTime: metricRecalculationJobRecalculationTime, }, nil } @@ -123,30 +131,29 @@ func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []e var errs error for _, ent := range entitlements { start := time.Now() - if ent.DeletedAt != nil { - err := r.sendEntitlementDeletedEvent(ctx, ent) - if err != nil { - errs = errors.Join(errs, err) - } - - r.metricRecalculationTime.Record(ctx, - time.Since(start).Milliseconds(), - metric.WithAttributes(recalculationTimeDeleteAttribute)) - } else { - err := r.sendEntitlementUpdatedEvent(ctx, ent) - if err != nil { - errs = errors.Join(errs, err) - } - - r.metricRecalculationTime.Record(ctx, - time.Since(start).Milliseconds(), - metric.WithAttributes(recalculationTimeUpdateAttribute)) + + if err := r.sendEntitlementEvent(ctx, ent); err != nil { + errs = errors.Join(errs, fmt.Errorf("error sending event for entitlement [id=%s]: %w", ent.ID, err)) } + + r.metricRecalculationJobRecalculationTime.Record(ctx, + time.Since(start).Milliseconds(), + metric.WithAttributes( + attribute.String(metricAttributeKeyEntitltementType, string(ent.EntitlementType)), + )) } return errs } +func (r *Recalculator) sendEntitlementEvent(ctx context.Context, ent entitlement.Entitlement) error { + if ent.DeletedAt != nil { + return r.sendEntitlementDeletedEvent(ctx, ent) + } + + return r.sendEntitlementUpdatedEvent(ctx, ent) +} + func (r *Recalculator) sendEntitlementDeletedEvent(ctx context.Context, ent entitlement.Entitlement) error { subject, err := r.getSubjectByKey(ctx, ent.Namespace, ent.SubjectKey) if err != nil { @@ -196,6 +203,12 @@ func (r *Recalculator) sendEntitlementUpdatedEvent(ctx context.Context, ent enti return fmt.Errorf("failed to get entitlement value: %w", err) } + r.metricRecalculationTime.Record(ctx, + time.Since(calculatedAt).Milliseconds(), + metric.WithAttributes( + attribute.String(metricAttributeKeyEntitltementType, string(ent.EntitlementType)), + )) + mappedValues, err := entitlementdriver.MapEntitlementValueToAPI(value) if err != nil { return fmt.Errorf("failed to map entitlement value: %w", err) diff --git a/openmeter/entitlement/balanceworker/worker.go b/openmeter/entitlement/balanceworker/worker.go index 3f608c938..fd648c2fd 100644 --- a/openmeter/entitlement/balanceworker/worker.go +++ b/openmeter/entitlement/balanceworker/worker.go @@ -64,6 +64,8 @@ type Worker struct { router *message.Router highWatermarkCache *lru.Cache[string, highWatermarkCacheEntry] + + metricRecalculationTime metric.Int64Histogram } func New(opts WorkerOptions) (*Worker, error) { @@ -72,11 +74,21 @@ func New(opts WorkerOptions) (*Worker, error) { return nil, fmt.Errorf("failed to create high watermark cache: %w", err) } + metricRecalculationTime, err := opts.Router.MetricMeter.Int64Histogram( + metricNameRecalculationTime, + metric.WithDescription("Entitlement recalculation time"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err) + } + worker := &Worker{ opts: opts, entitlement: opts.Entitlement, repo: opts.Repo, highWatermarkCache: highWatermarkCache, + + metricRecalculationTime: metricRecalculationTime, } router, err := router.NewDefaultRouter(opts.Router)