From 7c768a76cee6baa4166490a8b6d930647817ade9 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Thu, 30 Jan 2025 15:52:25 +0100 Subject: [PATCH] refactor: use wire for jobs service --- app/common/billing.go | 8 + app/common/telemetry.go | 15 + cmd/jobs/billing/advance/advance.go | 198 +--------- cmd/jobs/config/loader.go | 46 --- cmd/jobs/entitlement/init.go | 114 ------ cmd/jobs/entitlement/recalculatesnapshots.go | 37 +- cmd/jobs/internal/app.go | 18 + cmd/jobs/internal/config.go | 31 ++ cmd/jobs/internal/globals.go | 11 + cmd/jobs/internal/version.go | 38 ++ cmd/jobs/internal/wire.go | 93 +++++ cmd/jobs/internal/wire_gen.go | 388 +++++++++++++++++++ cmd/jobs/main.go | 64 +-- cmd/jobs/service/otel.go | 95 ----- cmd/jobs/version.go | 34 +- 15 files changed, 644 insertions(+), 546 deletions(-) delete mode 100644 cmd/jobs/config/loader.go delete mode 100644 cmd/jobs/entitlement/init.go create mode 100644 cmd/jobs/internal/app.go create mode 100644 cmd/jobs/internal/config.go create mode 100644 cmd/jobs/internal/globals.go create mode 100644 cmd/jobs/internal/version.go create mode 100644 cmd/jobs/internal/wire.go create mode 100644 cmd/jobs/internal/wire_gen.go delete mode 100644 cmd/jobs/service/otel.go diff --git a/app/common/billing.go b/app/common/billing.go index 728882322..6d65beb97 100644 --- a/app/common/billing.go +++ b/app/common/billing.go @@ -12,6 +12,7 @@ import ( billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" billingsubscription "github.com/openmeterio/openmeter/openmeter/billing/subscription" + billingworkerautoadvance "github.com/openmeterio/openmeter/openmeter/billing/worker/advance" "github.com/openmeterio/openmeter/openmeter/customer" entdb "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/meter" @@ -74,3 +75,10 @@ func BillingSubscriptionValidator( return billingsubscription.NewValidator(billingService) } + +func NewBillingAutoAdvancer(logger *slog.Logger, service billing.Service) (*billingworkerautoadvance.AutoAdvancer, error) { + return billingworkerautoadvance.NewAdvancer(billingworkerautoadvance.Config{ + BillingService: service, + Logger: logger, + }) +} diff --git a/app/common/telemetry.go b/app/common/telemetry.go index 3676247b4..2bc6bf634 100644 --- a/app/common/telemetry.go +++ b/app/common/telemetry.go @@ -33,6 +33,21 @@ import ( "github.com/openmeterio/openmeter/pkg/gosundheit" ) +var TelemetryWithoutServer = wire.NewSet( + NewTelemetryResource, + + NewLoggerProvider, + wire.Bind(new(log.LoggerProvider), new(*sdklog.LoggerProvider)), + NewLogger, + + NewMeterProvider, + wire.Bind(new(metric.MeterProvider), new(*sdkmetric.MeterProvider)), + NewMeter, + NewTracerProvider, + wire.Bind(new(trace.TracerProvider), new(*sdktrace.TracerProvider)), + NewTracer, +) + var Telemetry = wire.NewSet( NewTelemetryResource, diff --git a/cmd/jobs/billing/advance/advance.go b/cmd/jobs/billing/advance/advance.go index e6a89f79d..c77fd1975 100644 --- a/cmd/jobs/billing/advance/advance.go +++ b/cmd/jobs/billing/advance/advance.go @@ -1,24 +1,12 @@ package advance import ( - "context" "fmt" - "log/slog" "github.com/spf13/cobra" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "github.com/openmeterio/openmeter/app/common" - appconfig "github.com/openmeterio/openmeter/app/config" - "github.com/openmeterio/openmeter/cmd/jobs/config" + "github.com/openmeterio/openmeter/cmd/jobs/internal" "github.com/openmeterio/openmeter/openmeter/billing" - billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" - billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" - billingworkerautoadvance "github.com/openmeterio/openmeter/openmeter/billing/worker/advance" - registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder" - "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" - "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver" - "github.com/openmeterio/openmeter/pkg/framework/pgdriver" ) var namespace string @@ -36,167 +24,17 @@ func init() { Cmd.PersistentFlags().StringVar(&namespace, "namespace", "", "namespace the operation should be performed") } -type autoAdvancer struct { - *billingworkerautoadvance.AutoAdvancer - - Shutdown func() -} - -func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger *slog.Logger) (*autoAdvancer, error) { - commonMetadata := common.NewMetadata(conf, "0.0.0", "billing-advancer") - - // We use a noop meter provider as we don't want to monitor cronjobs (for now) - meterProvider := sdkmetric.NewMeterProvider() - meter := meterProvider.Meter("billing-advancer") - - // Initialize Postgres driver - postgresDriver, err := pgdriver.NewPostgresDriver(ctx, conf.Postgres.URL) - if err != nil { - return nil, fmt.Errorf("failed to initialize postgres driver: %w", err) - } - - // Initialize Ent driver - entPostgresDriver := entdriver.NewEntPostgresDriver(postgresDriver.DB()) - - meterRepository := common.NewInMemoryRepository(conf.Meters) - - clickhouseConn, err := common.NewClickHouse(conf.Aggregation.ClickHouse) - if err != nil { - return nil, fmt.Errorf("failed to initialize clickhouse connection: %w", err) - } - - streamingConnector, err := common.NewStreamingConnector(ctx, conf.Aggregation, clickhouseConn, meterRepository, logger) - if err != nil { - return nil, fmt.Errorf("failed to initialize streaming connection: %w", err) - } - - brokerOptions := common.NewBrokerConfiguration(conf.Ingest.Kafka.KafkaConfiguration, conf.Telemetry.Log, commonMetadata, logger, meter) - - adminClient, err := common.NewKafkaAdminClient(conf.Ingest.Kafka.KafkaConfiguration) - if err != nil { - return nil, fmt.Errorf("failed to initialize kafka admin client: %w", err) - } - - kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, conf.Ingest.Kafka.TopicProvisionerConfig) - - topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) - if err != nil { - return nil, fmt.Errorf("failed to initialize kafka topic provisioner: %w", err) - } - - publisher, serverPublisherShutdown, err := common.NewServerPublisher(ctx, kafka.PublisherOptions{ - Broker: brokerOptions, - ProvisionTopics: common.ServerProvisionTopics(conf.Events), - TopicProvisioner: topicProvisioner, - }, logger) - if err != nil { - return nil, fmt.Errorf("failed to initialize server publisher: %w", err) - } - - ebPublisher, err := common.NewEventBusPublisher(publisher, conf.Events, logger) - if err != nil { - return nil, fmt.Errorf("failed to initialize event bus publisher: %w", err) - } - - entitlementRegistry := registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{ - DatabaseClient: entPostgresDriver.Client(), - StreamingConnector: streamingConnector, - Logger: logger, - MeterRepository: meterRepository, - Publisher: ebPublisher, - }) - - customerService, err := common.NewCustomerService(logger, entPostgresDriver.Client(), entitlementRegistry) - if err != nil { - return nil, fmt.Errorf("failed to initialize customer service: %w", err) - } - - secretService, err := common.NewUnsafeSecretService(logger, entPostgresDriver.Client()) - if err != nil { - return nil, fmt.Errorf("failed to initialize secret service: %w", err) - } - - appService, err := common.NewAppService(logger, entPostgresDriver.Client(), conf.Apps) - if err != nil { - return nil, fmt.Errorf("failed to initialize app service: %w", err) - } - - _, err = common.NewAppStripeService(logger, entPostgresDriver.Client(), conf.Apps, appService, customerService, secretService) - if err != nil { - return nil, fmt.Errorf("failed to initialize stripe app service: %w", err) - } - - namespaceManager, err := common.NewNamespaceManager(nil, conf.Namespace) - if err != nil { - return nil, fmt.Errorf("failed to initialize namespace manager: %w", err) - } - - billingAdapter, err := billingadapter.New(billingadapter.Config{ - Client: entPostgresDriver.Client(), - Logger: logger, - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize billing adapter: %w", err) - } - - billingService, err := billingservice.New(billingservice.Config{ - Adapter: billingAdapter, - CustomerService: customerService, - AppService: appService, - Logger: logger, - FeatureService: entitlementRegistry.Feature, - MeterRepo: meterRepository, - StreamingConnector: streamingConnector, - Publisher: ebPublisher, - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize billing service: %w", err) - } - - _, err = common.NewAppSandboxProvisioner(ctx, logger, conf.Apps, appService, namespaceManager, billingService) - if err != nil { - return nil, fmt.Errorf("failed to initialize sandbox app provisioner: %w", err) - } - - a, err := billingworkerautoadvance.NewAdvancer(billingworkerautoadvance.Config{ - BillingService: billingService, - Logger: logger, - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize billing auto-advancer: %w", err) - } - - return &autoAdvancer{ - AutoAdvancer: a, - Shutdown: serverPublisherShutdown, - }, nil -} - var ListCmd = func() *cobra.Command { cmd := &cobra.Command{ Use: "list", Short: "List invoices which can be advanced", RunE: func(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() - - conf, err := config.GetConfig() - if err != nil { - return err - } - - a, err := NewAutoAdvancer(ctx, conf, slog.Default()) - if err != nil { - return err - } - - defer a.Shutdown() - var ns []string if namespace != "" { ns = append(ns, namespace) } - invoices, err := a.ListInvoicesToAdvance(ctx, ns, nil) + invoices, err := internal.App.BillingAutoAdvancer.ListInvoicesToAdvance(cmd.Context(), ns, nil) if err != nil { return err } @@ -218,26 +56,12 @@ var InvoiceCmd = func() *cobra.Command { Short: "Advance invoice(s)", Args: cobra.MinimumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() - - conf, err := config.GetConfig() - if err != nil { - return err - } - - a, err := NewAutoAdvancer(ctx, conf, slog.Default()) - if err != nil { - return err - } - - defer a.Shutdown() - if namespace == "" { return fmt.Errorf("invoice namespace is required") } for _, invoiceID := range args { - _, err := a.AdvanceInvoice(ctx, billing.InvoiceID{ + _, err := internal.App.BillingAutoAdvancer.AdvanceInvoice(cmd.Context(), billing.InvoiceID{ Namespace: namespace, ID: invoiceID, }) @@ -260,26 +84,12 @@ var AllCmd = func() *cobra.Command { Use: "all", Short: "Advance all eligible invoices", RunE: func(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() - - conf, err := config.GetConfig() - if err != nil { - return err - } - - a, err := NewAutoAdvancer(ctx, conf, slog.Default()) - if err != nil { - return err - } - - defer a.Shutdown() - var ns []string if namespace != "" { ns = append(ns, namespace) } - return a.All(ctx, ns, batchSize) + return internal.App.BillingAutoAdvancer.All(cmd.Context(), ns, batchSize) }, } diff --git a/cmd/jobs/config/loader.go b/cmd/jobs/config/loader.go deleted file mode 100644 index c7bbd3f9a..000000000 --- a/cmd/jobs/config/loader.go +++ /dev/null @@ -1,46 +0,0 @@ -package config - -import ( - "errors" - - "github.com/spf13/pflag" - "github.com/spf13/viper" - - "github.com/openmeterio/openmeter/app/config" -) - -func LoadConfig(fileName string) (config.Configuration, error) { - v, flags := viper.NewWithOptions(viper.WithDecodeHook(config.DecodeHook())), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError) - - config.SetViperDefaults(v, flags) - if fileName != "" { - v.SetConfigFile(fileName) - } - - err := v.ReadInConfig() - if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) { - return config.Configuration{}, err - } - - var conf config.Configuration - err = v.Unmarshal(&conf) - if err != nil { - return conf, err - } - - return conf, conf.Validate() -} - -var defaultConfig *config.Configuration - -func GetConfig() (config.Configuration, error) { - if defaultConfig == nil { - return config.Configuration{}, errors.New("config not set") - } - - return *defaultConfig, nil -} - -func SetConfig(c config.Configuration) { - defaultConfig = &c -} diff --git a/cmd/jobs/entitlement/init.go b/cmd/jobs/entitlement/init.go deleted file mode 100644 index 071552c50..000000000 --- a/cmd/jobs/entitlement/init.go +++ /dev/null @@ -1,114 +0,0 @@ -package entitlement - -import ( - "context" - "fmt" - "log/slog" - - "github.com/ClickHouse/clickhouse-go/v2" - "go.opentelemetry.io/otel/metric" - - "github.com/openmeterio/openmeter/app/common" - "github.com/openmeterio/openmeter/app/config" - "github.com/openmeterio/openmeter/openmeter/meter" - "github.com/openmeterio/openmeter/openmeter/registry" - registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder" - watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" - "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" - entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver" - "github.com/openmeterio/openmeter/pkg/framework/pgdriver" - "github.com/openmeterio/openmeter/pkg/models" - "github.com/openmeterio/openmeter/pkg/slicesx" -) - -type entitlementConnectors struct { - Registry *registry.Entitlement - EventBus eventbus.Publisher - Shutdown func() -} - -func initEntitlements(ctx context.Context, conf config.Configuration, logger *slog.Logger, metricMeter metric.Meter, otelName string) (*entitlementConnectors, error) { - // Initialize Postgres driver - postgresDriver, err := pgdriver.NewPostgresDriver(ctx, conf.Postgres.URL) - if err != nil { - return nil, fmt.Errorf("error initializing postgres driver: %w", err) - } - - // Initialize Ent driver - entPostgresDriver := entdriver.NewEntPostgresDriver(postgresDriver.DB()) - - logger.Info("Postgres client initialized") - - // Meter repository - meterRepository := meter.NewInMemoryRepository(slicesx.Map(conf.Meters, func(meter *models.Meter) models.Meter { - return *meter - })) - - // streaming connector - clickHouseClient, err := clickhouse.Open(conf.Aggregation.ClickHouse.GetClientOptions()) - if err != nil { - return nil, fmt.Errorf("failed to initialize clickhouse client: %w", err) - } - - streamingConnector, err := common.NewStreamingConnector(ctx, conf.Aggregation, clickHouseClient, meterRepository, logger) - if err != nil { - return nil, fmt.Errorf("init clickhouse streaming: %w", err) - } - - // event publishing - eventPublisherDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ - Broker: watermillkafka.BrokerOptions{ - KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, - ClientID: otelName, - Logger: logger, - MetricMeter: metricMeter, - DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug, - }, - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize event publisher driver: %w", err) - } - - eventPublisher, err := eventbus.New(eventbus.Options{ - Publisher: eventPublisherDriver, - TopicMapping: conf.Events.EventBusTopicMapping(), - Logger: logger, - MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject, - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize event publisher: %w", err) - } - - entitlementRegistry := registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{ - DatabaseClient: entPostgresDriver.Client(), - StreamingConnector: streamingConnector, - MeterRepository: meterRepository, - Logger: logger, - Publisher: eventPublisher, - }) - - return &entitlementConnectors{ - Registry: entitlementRegistry, - EventBus: eventPublisher, - Shutdown: func() { - if err := entPostgresDriver.Close(); err != nil { - logger.Error("failed to close ent driver", "error", err) - } - - if postgresDriver != nil { - err := postgresDriver.Close() - if err != nil { - logger.Error("failed to close postgres driver", "error", err) - } - } - - if err := clickHouseClient.Close(); err != nil { - logger.Error("failed to close clickhouse client", "error", err) - } - - if err := eventPublisherDriver.Close(); err != nil { - logger.Error("failed to close event publisher", "error", err) - } - }, - }, nil -} diff --git a/cmd/jobs/entitlement/recalculatesnapshots.go b/cmd/jobs/entitlement/recalculatesnapshots.go index 72dc7df8a..388ada016 100644 --- a/cmd/jobs/entitlement/recalculatesnapshots.go +++ b/cmd/jobs/entitlement/recalculatesnapshots.go @@ -1,50 +1,21 @@ package entitlement import ( - "log/slog" - "github.com/spf13/cobra" - "go.opentelemetry.io/otel" - "github.com/openmeterio/openmeter/cmd/jobs/config" + "github.com/openmeterio/openmeter/cmd/jobs/internal" "github.com/openmeterio/openmeter/openmeter/entitlement/balanceworker" ) -const ( - otelNameRecalculateBalanceSnapshot = "openmeter.io/jobs/entitlement/recalculate-balance-snapshots" -) - func NewRecalculateBalanceSnapshotsCommand() *cobra.Command { return &cobra.Command{ Use: "recalculate-balance-snapshots", Short: "Recalculate balance snapshots and send the resulting events into the eventbus", RunE: func(cmd *cobra.Command, args []string) error { - conf, err := config.GetConfig() - if err != nil { - return err - } - - logger := slog.Default() - - metricMeter := otel.GetMeterProvider().Meter(otelNameRecalculateBalanceSnapshot) - - entitlementConnectors, err := initEntitlements( - cmd.Context(), - conf, - logger, - metricMeter, - otelNameRecalculateBalanceSnapshot, - ) - if err != nil { - return err - } - - defer entitlementConnectors.Shutdown() - recalculator, err := balanceworker.NewRecalculator(balanceworker.RecalculatorOptions{ - Entitlement: entitlementConnectors.Registry, - EventBus: entitlementConnectors.EventBus, - MetricMeter: metricMeter, + Entitlement: internal.App.EntitlementRegistry, + EventBus: internal.App.EventPublisher, + MetricMeter: internal.App.Meter, }) if err != nil { return err diff --git a/cmd/jobs/internal/app.go b/cmd/jobs/internal/app.go new file mode 100644 index 000000000..40e987ade --- /dev/null +++ b/cmd/jobs/internal/app.go @@ -0,0 +1,18 @@ +package internal + +import "context" + +func InitializeApplication(ctx context.Context, configFile string) error { + var err error + + if err = loadConfig(configFile); err != nil { + return err + } + + App, AppShutdown, err = initializeApplication(ctx, Config) + if err != nil { + return err + } + + return nil +} diff --git a/cmd/jobs/internal/config.go b/cmd/jobs/internal/config.go new file mode 100644 index 000000000..247f95c4f --- /dev/null +++ b/cmd/jobs/internal/config.go @@ -0,0 +1,31 @@ +package internal + +import ( + "errors" + + "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/openmeterio/openmeter/app/config" +) + +func loadConfig(configFile string) error { + v, flags := viper.NewWithOptions(viper.WithDecodeHook(config.DecodeHook())), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError) + + config.SetViperDefaults(v, flags) + if configFile != "" { + v.SetConfigFile(configFile) + } + + err := v.ReadInConfig() + if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) { + return err + } + + err = v.Unmarshal(&Config) + if err != nil { + return err + } + + return Config.Validate() +} diff --git a/cmd/jobs/internal/globals.go b/cmd/jobs/internal/globals.go new file mode 100644 index 000000000..ada89d9a5 --- /dev/null +++ b/cmd/jobs/internal/globals.go @@ -0,0 +1,11 @@ +package internal + +import "github.com/openmeterio/openmeter/app/config" + +//nolint:gochecknoglobals +var ( + App Application + AppShutdown func() + Config config.Configuration + ConfigFile string +) diff --git a/cmd/jobs/internal/version.go b/cmd/jobs/internal/version.go new file mode 100644 index 000000000..81990a2a9 --- /dev/null +++ b/cmd/jobs/internal/version.go @@ -0,0 +1,38 @@ +package internal + +import ( + "runtime/debug" +) + +//nolint:gochecknoglobals +var ( + version string + revision string + revisionDate string +) + +//nolint:gochecknoinits,goconst +func init() { + if version == "" { + version = "unknown" + } + + buildInfo, _ := debug.ReadBuildInfo() + + revision = "unknown" + revisionDate = "unknown" + + for _, setting := range buildInfo.Settings { + if setting.Key == "vcs.revision" { + revision = setting.Value + } + + if setting.Key == "vcs.time" { + revisionDate = setting.Value + } + } +} + +func Version() (string, string, string) { + return version, revision, revisionDate +} diff --git a/cmd/jobs/internal/wire.go b/cmd/jobs/internal/wire.go new file mode 100644 index 000000000..4f63fbcbb --- /dev/null +++ b/cmd/jobs/internal/wire.go @@ -0,0 +1,93 @@ +//go:build wireinject +// +build wireinject + +package internal + +import ( + "context" + "log/slog" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/google/wire" + "go.opentelemetry.io/otel/metric" + + "github.com/openmeterio/openmeter/app/common" + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" + "github.com/openmeterio/openmeter/openmeter/billing" + billingworkerautoadvance "github.com/openmeterio/openmeter/openmeter/billing/worker/advance" + "github.com/openmeterio/openmeter/openmeter/customer" + "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + "github.com/openmeterio/openmeter/openmeter/registry" + "github.com/openmeterio/openmeter/openmeter/secret" + "github.com/openmeterio/openmeter/openmeter/streaming" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" +) + +type Application struct { + common.GlobalInitializer + common.Migrator + + App app.Service + AppStripe appstripe.Service + AppSandboxProvisioner common.AppSandboxProvisioner + Customer customer.Service + Billing billing.Service + BillingAutoAdvancer *billingworkerautoadvance.AutoAdvancer + EntClient *db.Client + EventPublisher eventbus.Publisher + EntitlementRegistry *registry.Entitlement + FeatureConnector feature.FeatureConnector + KafkaProducer *kafka.Producer + KafkaMetrics *kafkametrics.Metrics + Logger *slog.Logger + MeterRepository meter.Repository + NamespaceHandlers []namespace.Handler + NamespaceManager *namespace.Manager + Meter metric.Meter + Plan plan.Service + Secret secret.Service + Subscription common.SubscriptionServiceWithWorkflow + StreamingConnector streaming.Connector +} + +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { + wire.Build( + metadata, + common.App, + common.Billing, + common.ClickHouse, + common.Config, + common.Customer, + common.Database, + common.Entitlement, + common.Framework, + common.Kafka, + common.KafkaNamespaceResolver, + common.MeterInMemory, + common.Namespace, + common.NewBillingAutoAdvancer, + common.NewDefaultTextMapPropagator, + common.NewServerPublisher, + common.Streaming, + common.ProductCatalog, + common.Subscription, + common.Secret, + common.ServerProvisionTopics, + common.TelemetryWithoutServer, + common.WatermillNoPublisher, + wire.Struct(new(Application), "*"), + ) + + return Application{}, nil, nil +} + +func metadata(conf config.Configuration) common.Metadata { + return common.NewMetadata(conf, version, "jobs") +} diff --git a/cmd/jobs/internal/wire_gen.go b/cmd/jobs/internal/wire_gen.go new file mode 100644 index 000000000..c023349c9 --- /dev/null +++ b/cmd/jobs/internal/wire_gen.go @@ -0,0 +1,388 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run -mod=mod github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package internal + +import ( + "context" + kafka2 "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/openmeterio/openmeter/app/common" + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + "github.com/openmeterio/openmeter/openmeter/app/stripe" + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/billing/worker/advance" + "github.com/openmeterio/openmeter/openmeter/customer" + "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + "github.com/openmeterio/openmeter/openmeter/registry" + "github.com/openmeterio/openmeter/openmeter/secret" + "github.com/openmeterio/openmeter/openmeter/streaming" + "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/pkg/kafka/metrics" + "go.opentelemetry.io/otel/metric" + "log/slog" +) + +// Injectors from wire.go: + +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { + telemetryConfig := conf.Telemetry + logTelemetryConfig := telemetryConfig.Log + commonMetadata := metadata(conf) + resource := common.NewTelemetryResource(commonMetadata) + loggerProvider, cleanup, err := common.NewLoggerProvider(ctx, logTelemetryConfig, resource) + if err != nil { + return Application{}, nil, err + } + logger := common.NewLogger(logTelemetryConfig, resource, loggerProvider, commonMetadata) + metricsTelemetryConfig := telemetryConfig.Metrics + meterProvider, cleanup2, err := common.NewMeterProvider(ctx, metricsTelemetryConfig, resource, logger) + if err != nil { + cleanup() + return Application{}, nil, err + } + traceTelemetryConfig := telemetryConfig.Trace + tracerProvider, cleanup3, err := common.NewTracerProvider(ctx, traceTelemetryConfig, resource, logger) + if err != nil { + cleanup2() + cleanup() + return Application{}, nil, err + } + textMapPropagator := common.NewDefaultTextMapPropagator() + globalInitializer := common.GlobalInitializer{ + Logger: logger, + MeterProvider: meterProvider, + TracerProvider: tracerProvider, + TextMapPropagator: textMapPropagator, + } + postgresConfig := conf.Postgres + meter := common.NewMeter(meterProvider, commonMetadata) + driver, cleanup4, err := common.NewPostgresDriver(ctx, postgresConfig, meterProvider, meter, tracerProvider, logger) + if err != nil { + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + db := common.NewDB(driver) + entPostgresDriver, cleanup5 := common.NewEntPostgresDriver(db, logger) + client := common.NewEntClient(entPostgresDriver) + migrator := common.Migrator{ + Config: postgresConfig, + Client: client, + Logger: logger, + } + appsConfiguration := conf.Apps + service, err := common.NewAppService(logger, client, appsConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + entitlementsConfiguration := conf.Entitlements + aggregationConfiguration := conf.Aggregation + clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse + v, err := common.NewClickHouse(clickHouseAggregationConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v2 := conf.Meters + inMemoryRepository := common.NewInMemoryRepository(v2) + connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v, inMemoryRepository, logger) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + ingestConfiguration := conf.Ingest + kafkaIngestConfiguration := ingestConfiguration.Kafka + kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration + brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) + eventsConfiguration := conf.Events + v3 := common.ServerProvisionTopics(eventsConfiguration) + adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + topicProvisionerConfig := kafkaIngestConfiguration.TopicProvisionerConfig + kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, topicProvisionerConfig) + topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + publisherOptions := kafka.PublisherOptions{ + Broker: brokerOptions, + ProvisionTopics: v3, + TopicProvisioner: topicProvisioner, + } + publisher, cleanup6, err := common.NewServerPublisher(ctx, publisherOptions, logger) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + eventbusPublisher, err := common.NewEventBusPublisher(publisher, eventsConfiguration, logger) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + entitlement := common.NewEntitlementRegistry(logger, client, entitlementsConfiguration, connector, inMemoryRepository, eventbusPublisher) + customerService, err := common.NewCustomerService(logger, client, entitlement) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + secretserviceService, err := common.NewUnsafeSecretService(logger, client) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + appstripeService, err := common.NewAppStripeService(logger, client, appsConfiguration, service, customerService, secretserviceService) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + namespacedTopicResolver, err := common.NewNamespacedTopicResolver(kafkaIngestConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + namespaceHandler, err := common.NewKafkaNamespaceHandler(namespacedTopicResolver, topicProvisioner, kafkaIngestConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v4 := common.NewNamespaceHandlers(namespaceHandler, connector) + namespaceConfiguration := conf.Namespace + manager, err := common.NewNamespaceManager(v4, namespaceConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + adapter, err := common.BillingAdapter(logger, client) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + billingConfiguration := conf.Billing + featureConnector := common.NewFeatureConnector(logger, client, inMemoryRepository) + billingService, err := common.BillingService(logger, client, service, appstripeService, adapter, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector, eventbusPublisher) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + appSandboxProvisioner, err := common.NewAppSandboxProvisioner(ctx, logger, appsConfiguration, service, manager, billingService) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + autoAdvancer, err := common.NewBillingAutoAdvancer(logger, billingService) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + producer, err := common.NewKafkaProducer(kafkaIngestConfiguration, logger) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + metrics, err := common.NewKafkaMetrics(meter) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + productCatalogConfiguration := conf.ProductCatalog + planService, err := common.NewPlanService(logger, client, productCatalogConfiguration, featureConnector) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + validator, err := common.BillingSubscriptionValidator(billingService, billingConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + subscriptionServiceWithWorkflow, err := common.NewSubscriptionServices(logger, client, productCatalogConfiguration, entitlementsConfiguration, featureConnector, entitlement, customerService, planService, eventbusPublisher, validator) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + application := Application{ + GlobalInitializer: globalInitializer, + Migrator: migrator, + App: service, + AppStripe: appstripeService, + AppSandboxProvisioner: appSandboxProvisioner, + Customer: customerService, + Billing: billingService, + BillingAutoAdvancer: autoAdvancer, + EntClient: client, + EventPublisher: eventbusPublisher, + EntitlementRegistry: entitlement, + FeatureConnector: featureConnector, + KafkaProducer: producer, + KafkaMetrics: metrics, + Logger: logger, + MeterRepository: inMemoryRepository, + NamespaceHandlers: v4, + NamespaceManager: manager, + Meter: meter, + Plan: planService, + Secret: secretserviceService, + Subscription: subscriptionServiceWithWorkflow, + StreamingConnector: connector, + } + return application, func() { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + }, nil +} + +// wire.go: + +type Application struct { + common.GlobalInitializer + common.Migrator + + App app.Service + AppStripe appstripe.Service + AppSandboxProvisioner common.AppSandboxProvisioner + Customer customer.Service + Billing billing.Service + BillingAutoAdvancer *billingworkeradvance.AutoAdvancer + EntClient *db.Client + EventPublisher eventbus.Publisher + EntitlementRegistry *registry.Entitlement + FeatureConnector feature.FeatureConnector + KafkaProducer *kafka2.Producer + KafkaMetrics *metrics.Metrics + Logger *slog.Logger + MeterRepository meter.Repository + NamespaceHandlers []namespace.Handler + NamespaceManager *namespace.Manager + Meter metric.Meter + Plan plan.Service + Secret secret.Service + Subscription common.SubscriptionServiceWithWorkflow + StreamingConnector streaming.Connector +} + +func metadata(conf config.Configuration) common.Metadata { + return common.NewMetadata(conf, version, "jobs") +} diff --git a/cmd/jobs/main.go b/cmd/jobs/main.go index 8da62c517..bee867f7f 100644 --- a/cmd/jobs/main.go +++ b/cmd/jobs/main.go @@ -1,58 +1,58 @@ package main import ( + "context" "log/slog" "os" + "os/signal" + "syscall" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/openmeterio/openmeter/cmd/jobs/billing" - "github.com/openmeterio/openmeter/cmd/jobs/config" "github.com/openmeterio/openmeter/cmd/jobs/entitlement" - "github.com/openmeterio/openmeter/cmd/jobs/service" + "github.com/openmeterio/openmeter/cmd/jobs/internal" ) -const ( - otelName = "openmeter.io/jobs" -) +var configFileName string -func main() { - var telemetry *service.Telemetry +var rootCmd = cobra.Command{ + Use: "jobs", + SilenceUsage: true, + SilenceErrors: true, +} - defer func() { - if telemetry != nil && telemetry.Shutdown != nil { - telemetry.Shutdown() +func main() { + // Create os.Signal aware context.Context which will trigger context cancellation + // upon receiving any of the listed signals. + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM) + defer cancel() + + err := internal.InitializeApplication(ctx, configFileName) + if err != nil { + slog.Error("failed to initialize application", "error", err) + + // Call cleanup function is may not set yet + if internal.AppShutdown != nil { + internal.AppShutdown() } - }() - - rootCmd := cobra.Command{ - Use: "jobs", - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - conf, err := config.LoadConfig(cmd.Flag("config").Value.String()) - if err != nil { - return err - } - - config.SetConfig(conf) - - telemetry, err = service.NewTelemetry(cmd.Context(), conf.Telemetry, conf.Environment, version, otelName) - return err - }, - SilenceErrors: true, + + os.Exit(1) } + defer internal.AppShutdown() - var configFileName string + if err = rootCmd.ExecuteContext(ctx); err != nil { + slog.Error("failed to execute command", "error", err) + os.Exit(1) + } +} +func init() { rootCmd.PersistentFlags().StringVarP(&configFileName, "config", "", "config.yaml", "config file (default is config.yaml)") _ = viper.BindPFlag("config", rootCmd.PersistentFlags().Lookup("config")) rootCmd.AddCommand(versionCommand()) rootCmd.AddCommand(entitlement.RootCommand()) rootCmd.AddCommand(billing.Cmd) - - if err := rootCmd.Execute(); err != nil { - slog.Error("failed to execute command", "error", err) - os.Exit(1) - } } diff --git a/cmd/jobs/service/otel.go b/cmd/jobs/service/otel.go deleted file mode 100644 index b8bd2a535..000000000 --- a/cmd/jobs/service/otel.go +++ /dev/null @@ -1,95 +0,0 @@ -package service - -import ( - "context" - "fmt" - "log/slog" - "os" - "time" - - "github.com/go-slog/otelslog" - slogmulti "github.com/samber/slog-multi" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.17.0" - - "github.com/openmeterio/openmeter/app/config" - "github.com/openmeterio/openmeter/pkg/contextx" -) - -const ( - defaultShutdownTimeout = 5 * time.Second -) - -type Telemetry struct { - Logger *slog.Logger - MetricMeter metric.Meter - Shutdown func() -} - -func NewTelemetry(ctx context.Context, conf config.TelemetryConfig, env string, version string, otelName string) (*Telemetry, error) { - extraResources, _ := resource.New( - context.Background(), - resource.WithContainer(), - resource.WithAttributes( - semconv.ServiceName("openmeter"), - semconv.ServiceVersion(version), - semconv.DeploymentEnvironment(env), - ), - ) - res, _ := resource.Merge( - resource.Default(), - extraResources, - ) - - logger := slog.New(slogmulti.Pipe( - otelslog.ResourceMiddleware(res), - otelslog.NewHandler, - contextx.NewLogHandler, - ).Handler(conf.Log.NewHandler(os.Stdout))) - - slog.SetDefault(logger) - - // Initialize OTel Metrics - otelMeterProvider, err := conf.Metrics.NewMeterProvider(ctx, res) - if err != nil { - logger.Error("failed to initialize OpenTelemetry Metrics provider", slog.String("error", err.Error())) - os.Exit(1) - } - defer func() { - // Use dedicated context with timeout for shutdown as parent context might be canceled - // by the time the execution reaches this stage. - }() - otel.SetMeterProvider(otelMeterProvider) - metricMeter := otelMeterProvider.Meter(otelName) - - // Initialize OTel Tracer - otelTracerProvider, err := conf.Trace.NewTracerProvider(ctx, res) - if err != nil { - return nil, fmt.Errorf("failed to initialize OpenTelemetry Trace provider: %w", err) - } - - otel.SetTracerProvider(otelTracerProvider) - otel.SetTextMapPropagator(propagation.TraceContext{}) - - return &Telemetry{ - Logger: logger, - MetricMeter: metricMeter, - Shutdown: func() { - // Use dedicated context with timeout for shutdown as parent context might be canceled - // by the time the execution reaches this stage. - ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) - defer cancel() - - if err := otelMeterProvider.Shutdown(ctx); err != nil { - logger.Error("shutting down meter provider", slog.String("error", err.Error())) - } - - if err := otelTracerProvider.Shutdown(ctx); err != nil { - logger.Error("shutting down tracer provider", slog.String("error", err.Error())) - } - }, - }, nil -} diff --git a/cmd/jobs/version.go b/cmd/jobs/version.go index 973b1f242..f8ec374a4 100644 --- a/cmd/jobs/version.go +++ b/cmd/jobs/version.go @@ -1,47 +1,17 @@ package main import ( - "runtime/debug" - "github.com/spf13/cobra" -) - -// Provisioned by ldflags. -var version string -//nolint:gochecknoglobals -var ( - revision string - revisionDate string + "github.com/openmeterio/openmeter/cmd/jobs/internal" ) -//nolint:gochecknoinits,goconst -func init() { - if version == "" { - version = "unknown" - } - - buildInfo, _ := debug.ReadBuildInfo() - - revision = "unknown" - revisionDate = "unknown" - - for _, setting := range buildInfo.Settings { - if setting.Key == "vcs.revision" { - revision = setting.Value - } - - if setting.Key == "vcs.time" { - revisionDate = setting.Value - } - } -} - func versionCommand() *cobra.Command { return &cobra.Command{ Use: "version", Short: "Print the version", Run: func(cmd *cobra.Command, args []string) { + version, revision, revisionDate := internal.Version() cmd.Printf("%s version %s (%s) built on %s\n", "Open Meter", version, revision, revisionDate) }, }