Skip to content

Commit

Permalink
feat(billing): implement invoice collector (#2179)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisgacsal authored Jan 31, 2025
1 parent 5a5f911 commit 627173e
Show file tree
Hide file tree
Showing 8 changed files with 429 additions and 21 deletions.
8 changes: 8 additions & 0 deletions app/common/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
billingservice "github.com/openmeterio/openmeter/openmeter/billing/service"
billingsubscription "github.com/openmeterio/openmeter/openmeter/billing/subscription"
billingworkerautoadvance "github.com/openmeterio/openmeter/openmeter/billing/worker/advance"
billingworkercollect "github.com/openmeterio/openmeter/openmeter/billing/worker/collect"
"github.com/openmeterio/openmeter/openmeter/customer"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
"github.com/openmeterio/openmeter/openmeter/meter"
Expand Down Expand Up @@ -82,3 +83,10 @@ func NewBillingAutoAdvancer(logger *slog.Logger, service billing.Service) (*bill
Logger: logger,
})
}

func NewBillingCollector(logger *slog.Logger, service billing.Service) (*billingworkercollect.InvoiceCollector, error) {
return billingworkercollect.NewInvoiceCollector(billingworkercollect.Config{
BillingService: service,
Logger: logger,
})
}
2 changes: 2 additions & 0 deletions cmd/jobs/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/spf13/cobra"

"github.com/openmeterio/openmeter/cmd/jobs/billing/advance"
"github.com/openmeterio/openmeter/cmd/jobs/billing/collect"
)

var Cmd = &cobra.Command{
Expand All @@ -13,4 +14,5 @@ var Cmd = &cobra.Command{

func init() {
Cmd.AddCommand(advance.Cmd)
Cmd.AddCommand(collect.Cmd)
}
102 changes: 102 additions & 0 deletions cmd/jobs/billing/collect/collect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package collect

import (
"fmt"
"time"

"github.com/spf13/cobra"

"github.com/openmeterio/openmeter/cmd/jobs/internal"
billingworkercollect "github.com/openmeterio/openmeter/openmeter/billing/worker/collect"
)

var (
namespaces []string
customerIDs []string
invoiceIDs []string
)

var Cmd = &cobra.Command{
Use: "collect",
Short: "Invoice collection operations",
}

func init() {
Cmd.AddCommand(ListCmd())
Cmd.AddCommand(InvoiceCmd())
Cmd.AddCommand(AllCmd())
}

var ListCmd = func() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List gathering invoices which can be collected",
RunE: func(cmd *cobra.Command, args []string) error {
invoices, err := internal.App.BillingCollector.ListCollectableInvoices(cmd.Context(),
billingworkercollect.ListCollectableInvoicesInput{
Namespaces: namespaces,
InvoiceIDs: invoiceIDs,
Customers: customerIDs,
CollectionAt: time.Now(),
})
if err != nil {
return err
}

for _, invoice := range invoices {
fmt.Printf("Namespace: %s ID: %s CollectAt: %s\n", invoice.Namespace, invoice.ID, invoice.CollectionAt)
}

return nil
},
}

cmd.PersistentFlags().StringSliceVar(&namespaces, "n", nil, "filter by namespaces")
cmd.PersistentFlags().StringSliceVar(&customerIDs, "c", nil, "filter by customer ids")
cmd.PersistentFlags().StringSliceVar(&invoiceIDs, "i", nil, "filter by invoice ids")

return cmd
}

var InvoiceCmd = func() *cobra.Command {
cmd := &cobra.Command{
Use: "invoice [CUSTOMER_ID]",
Short: "Create new invoice(s) for customer(s)",
Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
for _, customerID := range args {
_, err := internal.App.BillingCollector.CollectCustomerInvoice(cmd.Context(),
billingworkercollect.CollectCustomerInvoiceInput{
CustomerID: customerID,
AsOf: nil,
},
)
if err != nil {
return fmt.Errorf("failed to advance invoice %s: %w", customerID, err)
}
}

return nil
},
}

return cmd
}

var batchSize int

var AllCmd = func() *cobra.Command {
cmd := &cobra.Command{
Use: "all",
Short: "Advance all eligible invoices",
RunE: func(cmd *cobra.Command, args []string) error {
return internal.App.BillingCollector.All(cmd.Context(), namespaces, customerIDs, batchSize)
},
}

cmd.PersistentFlags().StringSliceVar(&namespaces, "n", nil, "filter by namespaces")
cmd.PersistentFlags().StringSliceVar(&customerIDs, "c", nil, "filter by customer ids")
cmd.PersistentFlags().IntVar(&batchSize, "batch", 0, "operation batch size")

return cmd
}
3 changes: 3 additions & 0 deletions cmd/jobs/internal/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
"github.com/openmeterio/openmeter/openmeter/billing"
billingworkerautoadvance "github.com/openmeterio/openmeter/openmeter/billing/worker/advance"
billingworkercollect "github.com/openmeterio/openmeter/openmeter/billing/worker/collect"
"github.com/openmeterio/openmeter/openmeter/customer"
"github.com/openmeterio/openmeter/openmeter/ent/db"
"github.com/openmeterio/openmeter/openmeter/meter"
Expand All @@ -40,6 +41,7 @@ type Application struct {
Customer customer.Service
Billing billing.Service
BillingAutoAdvancer *billingworkerautoadvance.AutoAdvancer
BillingCollector *billingworkercollect.InvoiceCollector
EntClient *db.Client
EventPublisher eventbus.Publisher
EntitlementRegistry *registry.Entitlement
Expand Down Expand Up @@ -73,6 +75,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl
common.MeterInMemory,
common.Namespace,
common.NewBillingAutoAdvancer,
common.NewBillingCollector,
common.NewDefaultTextMapPropagator,
common.NewServerPublisher,
common.Streaming,
Expand Down
13 changes: 13 additions & 0 deletions cmd/jobs/internal/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 34 additions & 14 deletions openmeter/billing/service/collectionat.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,33 @@ func UpdateInvoiceCollectionAt(invoice *billing.Invoice, collection billing.Coll
return false
}

invoiceAt := GetEarliestValidInvoiceAt(invoice.Lines)

if invoiceAt.IsZero() {
return false
}

interval, ok := collection.Interval.Duration()
if !ok {
return false
}

collectionAt := invoiceAt.Add(interval)

if lo.FromPtr(invoice.CollectionAt).Equal(collectionAt) {
return false
}

invoice.CollectionAt = &collectionAt

return true
}

func GetEarliestValidInvoiceAt(lines billing.LineChildren) time.Time {
var invoiceAt time.Time

// Find the invoice lint with the earliest invoiceAt attribute
invoice.Lines.ForEach(func(v []*billing.Line) {
lines.ForEach(func(v []*billing.Line) {
for _, line := range v {
if line == nil || line.Status != billing.InvoiceLineStatusValid {
continue
Expand All @@ -41,22 +64,19 @@ func UpdateInvoiceCollectionAt(invoice *billing.Invoice, collection billing.Coll
}
})

if invoiceAt.IsZero() {
return false
}
return invoiceAt
}

interval, ok := collection.Interval.Duration()
if !ok {
return false
}
func GetInvoiceWithEarliestCollectionAt(invoices []billing.Invoice) billing.Invoice {
var idx int

collectionAt := invoiceAt.Add(interval)
collectAt := time.Now()

if lo.FromPtr(invoice.CollectionAt).Equal(collectionAt) {
return false
for i, invoice := range invoices {
if invoice.CollectionAt.Before(collectAt) {
idx = i
}
}

invoice.CollectionAt = &collectionAt

return true
return invoices[idx]
}
69 changes: 62 additions & 7 deletions openmeter/billing/service/invoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,23 +334,78 @@ func (s *Service) InvoicePendingLines(ctx context.Context, input billing.Invoice
return nil, fmt.Errorf("cleanup: line counts check: %w", err)
}

invoicesWithoutLines := lo.Filter(sourceInvoiceIDs, func(id string, _ int) bool {
return invoiceLineCounts.Counts[billing.InvoiceID{
// Collect gathering invoices which can be deleted and which needs to have their collectionAt updated
// due to still having live items.
liveGatheringInvoiceIDs := make([]string, 0, len(sourceInvoiceIDs))
emptyGatheringInvoiceIDs := make([]string, 0, len(sourceInvoiceIDs))

for _, invoiceID := range sourceInvoiceIDs {
invoiceNamespacedID := billing.InvoiceID{
Namespace: input.Customer.Namespace,
ID: id,
}] == 0
})
ID: invoiceID,
}

if invoiceLineCounts.Counts[invoiceNamespacedID] == 0 {
emptyGatheringInvoiceIDs = append(emptyGatheringInvoiceIDs, invoiceID)
} else {
liveGatheringInvoiceIDs = append(liveGatheringInvoiceIDs, invoiceID)
}
}

if len(invoicesWithoutLines) > 0 {
// Delete empty gathering invoices
if len(emptyGatheringInvoiceIDs) > 0 {
err = s.adapter.DeleteInvoices(ctx, billing.DeleteInvoicesAdapterInput{
Namespace: input.Customer.Namespace,
InvoiceIDs: invoicesWithoutLines,
InvoiceIDs: emptyGatheringInvoiceIDs,
})
if err != nil {
return nil, fmt.Errorf("cleanup invoices: %w", err)
}
}

// Update collectionAt for live gathering invoices
if len(liveGatheringInvoiceIDs) > 0 {
resp, err := s.ListInvoices(ctx, billing.ListInvoicesInput{
Customers: []string{input.Customer.ID},
IDs: liveGatheringInvoiceIDs,
ExtendedStatuses: []billing.InvoiceStatus{billing.InvoiceStatusGathering},
Expand: billing.InvoiceExpand{
Lines: true,
},
})
if err != nil {
return nil, fmt.Errorf("failed to get gathering invoice(s) for customer [customer=%s]: %w",
input.Customer.ID, err,
)
}

for _, invoice := range resp.Items {
collectionAt := invoice.CollectionAt
if ok := UpdateInvoiceCollectionAt(&invoice, customerProfile.Profile.WorkflowConfig.Collection); ok {
s.logger.DebugContext(ctx, "collection time updated for invoice",
"invoiceID", invoice.ID,
"collectionAt", map[string]interface{}{
"from": lo.FromPtr(collectionAt),
"to": lo.FromPtr(invoice.CollectionAt),
"collectionInterval": customerProfile.Profile.WorkflowConfig.Collection.Interval.String(),
},
)
}

if err = invoice.Validate(); err != nil {
return nil, billing.ValidationError{
Err: err,
}
}

if _, err = s.updateInvoice(ctx, invoice); err != nil {
return nil, fmt.Errorf("failed to update gathering invoice [namespace=%s invoice=%s, customer=%s]: %w",
input.Customer.Namespace, invoice.ID, input.Customer.ID, err,
)
}
}
}

// Assemble output: we need to refetch as the association call will have side-effects of updating
// invoice objects (e.g. totals, period, etc.)
out := make([]billing.Invoice, 0, len(createdInvoices))
Expand Down
Loading

0 comments on commit 627173e

Please sign in to comment.