From f4f4f84b7a1eeff1caa0f90abc57e72cd4e895d1 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Thu, 6 Mar 2025 14:01:31 -0800 Subject: [PATCH 1/3] Revert "fix: addOnDemandPayment after validatePayment" This reverts commit 288be60b316d7f83cb04a65a4f06e4c312999821. --- core/meterer/meterer.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index c230eb172e..e489accc86 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -214,7 +214,10 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM // Compute paymentCharged at payment time paymentCharged := PaymentCharged(symbolsCharged, m.ChainPaymentState.GetPricePerSymbol()) - + err = m.OffchainStore.AddOnDemandPayment(ctx, header, paymentCharged) + if err != nil { + return fmt.Errorf("failed to update cumulative payment: %w", err) + } // Validate payments attached err = m.ValidatePayment(ctx, header, onDemandPayment, paymentCharged) if err != nil { @@ -222,11 +225,6 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM return fmt.Errorf("invalid on-demand payment: %w", err) } - err = m.OffchainStore.AddOnDemandPayment(ctx, header, paymentCharged) - if err != nil { - return fmt.Errorf("failed to update cumulative payment: %w", err) - } - // Update bin usage atomically and check against bin capacity if err := m.IncrementGlobalBinUsage(ctx, uint64(symbolsCharged), receivedAt); err != nil { //TODO: conditionally remove the payment based on the error type (maybe if the error is store-op related) From 76068bbf7baf5244d28fd8349d159771ec6728d6 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 7 Mar 2025 02:41:56 -0800 Subject: [PATCH 2/3] Revert "chore: update unit test logic" This reverts commit e7b45eba08bba36b8e138300bfdff66d4761a709. --- core/meterer/meterer_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index afd3d98915..fd6495faa8 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -313,13 +313,13 @@ func TestMetererOnDemand(t *testing.T) { header = createPaymentHeader(now.UnixNano(), big.NewInt(1), accountID1) _, err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers, now) assert.ErrorContains(t, err, "insufficient cumulative payment increment") - // Not record for invalid payment + // No rollback after meter request result, err := dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{ Value: accountID1.Hex(), }}) assert.NoError(t, err) - assert.Equal(t, 0, len(result)) + assert.Equal(t, 1, len(result)) // test duplicated cumulative payments symbolLength := uint64(100) @@ -335,8 +335,7 @@ func TestMetererOnDemand(t *testing.T) { assert.ErrorContains(t, err, "exact payment already exists") // test valid payments - numValidPayments := 9 - for i := 1; i < numValidPayments; i++ { + for i := 1; i < 9; i++ { header = createPaymentHeader(now.UnixNano(), new(big.Int).Mul(priceCharged, big.NewInt(int64(i+1))), accountID2) symbolsCharged, err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers, now) assert.NoError(t, err) @@ -364,12 +363,13 @@ func TestMetererOnDemand(t *testing.T) { _, err = mt.MeterRequest(ctx, *header, 50, quorumNumbers, now) assert.ErrorContains(t, err, "invalid on-demand payment: breaking cumulative payment invariants") + numPrevRecords := 12 result, err = dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{ Value: accountID2.Hex(), }}) assert.NoError(t, err) - assert.Equal(t, numValidPayments, len(result)) + assert.Equal(t, numPrevRecords, len(result)) // test failed global rate limit (previously payment recorded: 2, global limit: 1009) header = createPaymentHeader(now.UnixNano(), big.NewInt(0).Add(previousCumulativePayment, meterer.PaymentCharged(1010, mt.ChainPaymentState.GetPricePerSymbol())), accountID1) _, err = mt.MeterRequest(ctx, *header, 1010, quorumNumbers, now) @@ -380,7 +380,7 @@ func TestMetererOnDemand(t *testing.T) { Value: accountID2.Hex(), }}) assert.NoError(t, err) - assert.Equal(t, numValidPayments, len(result)) + assert.Equal(t, numPrevRecords, len(result)) } func TestPaymentCharged(t *testing.T) { From 40679f9b52760e7ebeb98cbc57c7cd44b6c5abff Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 7 Mar 2025 09:17:24 -0800 Subject: [PATCH 3/3] feat: on-demand rollback for invalid payments --- core/meterer/meterer.go | 6 ++++-- core/meterer/meterer_test.go | 23 +++++++++++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index e489accc86..d1cfabe4a5 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -221,13 +221,15 @@ func (m *Meterer) ServeOnDemandRequest(ctx context.Context, header core.PaymentM // Validate payments attached err = m.ValidatePayment(ctx, header, onDemandPayment, paymentCharged) if err != nil { - // No tolerance for incorrect payment amounts; no rollbacks + dbErr := m.OffchainStore.RemoveOnDemandPayment(ctx, header.AccountID, header.CumulativePayment) + if dbErr != nil { + return dbErr + } return fmt.Errorf("invalid on-demand payment: %w", err) } // Update bin usage atomically and check against bin capacity if err := m.IncrementGlobalBinUsage(ctx, uint64(symbolsCharged), receivedAt); err != nil { - //TODO: conditionally remove the payment based on the error type (maybe if the error is store-op related) dbErr := m.OffchainStore.RemoveOnDemandPayment(ctx, header.AccountID, header.CumulativePayment) if dbErr != nil { return dbErr diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index fd6495faa8..a0bb5c995a 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -313,13 +313,13 @@ func TestMetererOnDemand(t *testing.T) { header = createPaymentHeader(now.UnixNano(), big.NewInt(1), accountID1) _, err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers, now) assert.ErrorContains(t, err, "insufficient cumulative payment increment") - // No rollback after meter request + // rollback for invalid payments result, err := dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{ Value: accountID1.Hex(), }}) assert.NoError(t, err) - assert.Equal(t, 1, len(result)) + assert.Equal(t, 0, len(result)) // test duplicated cumulative payments symbolLength := uint64(100) @@ -335,7 +335,8 @@ func TestMetererOnDemand(t *testing.T) { assert.ErrorContains(t, err, "exact payment already exists") // test valid payments - for i := 1; i < 9; i++ { + numValidPayments := 9 + for i := 1; i < numValidPayments; i++ { header = createPaymentHeader(now.UnixNano(), new(big.Int).Mul(priceCharged, big.NewInt(int64(i+1))), accountID2) symbolsCharged, err = mt.MeterRequest(ctx, *header, symbolLength, quorumNumbers, now) assert.NoError(t, err) @@ -360,27 +361,33 @@ func TestMetererOnDemand(t *testing.T) { // test cannot insert cumulative payment in out of order symbolsCharged = mt.SymbolsCharged(uint64(50)) header = createPaymentHeader(now.UnixNano(), meterer.PaymentCharged(symbolsCharged, mt.ChainPaymentState.GetPricePerSymbol()), accountID2) - _, err = mt.MeterRequest(ctx, *header, 50, quorumNumbers, now) + _, err = mt.MeterRequest(ctx, *header, 1, quorumNumbers, now) assert.ErrorContains(t, err, "invalid on-demand payment: breaking cumulative payment invariants") - numPrevRecords := 12 result, err = dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{ Value: accountID2.Hex(), }}) assert.NoError(t, err) - assert.Equal(t, numPrevRecords, len(result)) + assert.Equal(t, numValidPayments, len(result)) + + // with rollback of invalid payments, users cannot cheat by inserting an invalid cumulative payment + symbolsCharged = mt.SymbolsCharged(uint64(30)) + header = createPaymentHeader(now.UnixNano(), meterer.PaymentCharged(symbolsCharged, mt.ChainPaymentState.GetPricePerSymbol()), accountID2) + _, err = mt.MeterRequest(ctx, *header, 30, quorumNumbers, now) + assert.ErrorContains(t, err, "invalid on-demand payment: breaking cumulative payment invariants") + // test failed global rate limit (previously payment recorded: 2, global limit: 1009) header = createPaymentHeader(now.UnixNano(), big.NewInt(0).Add(previousCumulativePayment, meterer.PaymentCharged(1010, mt.ChainPaymentState.GetPricePerSymbol())), accountID1) _, err = mt.MeterRequest(ctx, *header, 1010, quorumNumbers, now) assert.ErrorContains(t, err, "failed global rate limiting") - // Correct rollback + // Correct rollback when exceeding global rate limit result, err = dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{ Value: accountID2.Hex(), }}) assert.NoError(t, err) - assert.Equal(t, numPrevRecords, len(result)) + assert.Equal(t, numValidPayments, len(result)) } func TestPaymentCharged(t *testing.T) {