Skip to content

Commit

Permalink
refactor: report all calculations
Browse files Browse the repository at this point in the history
This patch ensures that we are reporting all calculations performed by
the balance worker not just the ones done by the recalculator.

We also expose an end-to-end metric from the jobs, so that we can have a
measured upper bound on the job execution times.
  • Loading branch information
turip committed Aug 29, 2024
1 parent 14bba4b commit 93758ae
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 28 deletions.
9 changes: 9 additions & 0 deletions openmeter/entitlement/balanceworker/entitlementhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/openmeter/entitlement"
entitlementdriver "github.com/openmeterio/openmeter/openmeter/entitlement/driver"
"github.com/openmeterio/openmeter/openmeter/entitlement/snapshot"
Expand Down Expand Up @@ -83,11 +86,17 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementEntity *ent
return nil, fmt.Errorf("failed to get feature: %w", err)
}

calculationStart := time.Now()

value, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementEntity.Namespace, entitlementEntity.SubjectKey, entitlementEntity.ID, calculatedAt)
if err != nil {
return nil, fmt.Errorf("failed to get entitlement value: %w", err)
}

w.metricRecalculationTime.Record(ctx, time.Since(calculationStart).Milliseconds(), metric.WithAttributes(
attribute.String(metricAttributeKeyEntitltementType, string(entitlementEntity.EntitlementType)),
))

mappedValues, err := entitlementdriver.MapEntitlementValueToAPI(value)
if err != nil {
return nil, fmt.Errorf("failed to map entitlement value: %w", err)
Expand Down
69 changes: 41 additions & 28 deletions openmeter/entitlement/balanceworker/recalculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@ const (

defaultLRUCacheSize = 10_000

metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms"
)
metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms"
metricNameRecalculationJobCalculationTime = "balance_worker.entitlement_recalculation_job_calculation_time_ms"

var (
recalculationTimeUpdateAttribute = attribute.String("operation", "update")
recalculationTimeDeleteAttribute = attribute.String("operation", "delete")
metricAttributeKeyEntitltementType = "entitlement_type"
)

type RecalculatorOptions struct {
Expand Down Expand Up @@ -66,7 +64,8 @@ type Recalculator struct {
featureCache *lru.Cache[string, productcatalog.Feature]
subjectCache *lru.Cache[string, models.Subject]

metricRecalculationTime metric.Int64Histogram
metricRecalculationTime metric.Int64Histogram
metricRecalculationJobRecalculationTime metric.Int64Histogram
}

func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
Expand All @@ -92,11 +91,20 @@ func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err)
}

metricRecalculationJobRecalculationTime, err := opts.MetricMeter.Int64Histogram(
metricNameRecalculationJobCalculationTime,
metric.WithDescription("Time takes to recalculate the entitlements including the necessary data fetches"),
)
if err != nil {
return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err)
}

return &Recalculator{
opts: opts,
featureCache: featureCache,
subjectCache: subjectCache,
metricRecalculationTime: metricRecalculationTime,
opts: opts,
featureCache: featureCache,
subjectCache: subjectCache,
metricRecalculationTime: metricRecalculationTime,
metricRecalculationJobRecalculationTime: metricRecalculationJobRecalculationTime,
}, nil
}

Expand All @@ -123,30 +131,29 @@ func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []e
var errs error
for _, ent := range entitlements {
start := time.Now()
if ent.DeletedAt != nil {
err := r.sendEntitlementDeletedEvent(ctx, ent)
if err != nil {
errs = errors.Join(errs, err)
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Milliseconds(),
metric.WithAttributes(recalculationTimeDeleteAttribute))
} else {
err := r.sendEntitlementUpdatedEvent(ctx, ent)
if err != nil {
errs = errors.Join(errs, err)
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Milliseconds(),
metric.WithAttributes(recalculationTimeUpdateAttribute))

if err := r.sendEntitlementEvent(ctx, ent); err != nil {
errs = errors.Join(errs, fmt.Errorf("error sending event for entitlement [id=%s]: %w", ent.ID, err))
}

r.metricRecalculationJobRecalculationTime.Record(ctx,
time.Since(start).Milliseconds(),
metric.WithAttributes(
attribute.String(metricAttributeKeyEntitltementType, string(ent.EntitlementType)),
))
}

return errs
}

func (r *Recalculator) sendEntitlementEvent(ctx context.Context, ent entitlement.Entitlement) error {
if ent.DeletedAt != nil {
return r.sendEntitlementDeletedEvent(ctx, ent)
}

return r.sendEntitlementUpdatedEvent(ctx, ent)
}

func (r *Recalculator) sendEntitlementDeletedEvent(ctx context.Context, ent entitlement.Entitlement) error {
subject, err := r.getSubjectByKey(ctx, ent.Namespace, ent.SubjectKey)
if err != nil {
Expand Down Expand Up @@ -196,6 +203,12 @@ func (r *Recalculator) sendEntitlementUpdatedEvent(ctx context.Context, ent enti
return fmt.Errorf("failed to get entitlement value: %w", err)
}

r.metricRecalculationTime.Record(ctx,
time.Since(calculatedAt).Milliseconds(),
metric.WithAttributes(
attribute.String(metricAttributeKeyEntitltementType, string(ent.EntitlementType)),
))

mappedValues, err := entitlementdriver.MapEntitlementValueToAPI(value)
if err != nil {
return fmt.Errorf("failed to map entitlement value: %w", err)
Expand Down
12 changes: 12 additions & 0 deletions openmeter/entitlement/balanceworker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Worker struct {
router *message.Router

highWatermarkCache *lru.Cache[string, highWatermarkCacheEntry]

metricRecalculationTime metric.Int64Histogram
}

func New(opts WorkerOptions) (*Worker, error) {
Expand All @@ -72,11 +74,21 @@ func New(opts WorkerOptions) (*Worker, error) {
return nil, fmt.Errorf("failed to create high watermark cache: %w", err)
}

metricRecalculationTime, err := opts.Router.MetricMeter.Int64Histogram(
metricNameRecalculationTime,
metric.WithDescription("Entitlement recalculation time"),
)
if err != nil {
return nil, fmt.Errorf("failed to create recalculation time histogram: %w", err)
}

worker := &Worker{
opts: opts,
entitlement: opts.Entitlement,
repo: opts.Repo,
highWatermarkCache: highWatermarkCache,

metricRecalculationTime: metricRecalculationTime,
}

router, err := router.NewDefaultRouter(opts.Router)
Expand Down

0 comments on commit 93758ae

Please sign in to comment.