Skip to content

Commit

Permalink
expiration uses db timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
matYang committed May 7, 2024
1 parent f36ebac commit 90de77d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 30 deletions.
22 changes: 10 additions & 12 deletions core/services/ccip/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions core/services/ccip/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type ORM interface {
InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error
InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error

ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error
ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error
ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error
}

type orm struct {
Expand Down Expand Up @@ -109,6 +109,7 @@ func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at)
VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
Expand All @@ -134,6 +135,7 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect
})
}

// using statement_timestamp() to make testing easier
stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at)
VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());`
_, err := o.ds.NamedExecContext(ctx, stmt, insertData)
Expand All @@ -144,16 +146,16 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect
return err
}

func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error {
stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < $2`
func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, to)
_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}

func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error {
stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < $2`
func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error {
stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')`

_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, to)
_, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec)
return err
}
22 changes: 12 additions & 10 deletions core/services/ccip/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) {
assert.NoError(t, err)
}

interimTimeStamp := time.Now()
sleepSec := 2
time.Sleep(time.Duration(sleepSec) * time.Second)

// insert for the 2nd time after interimTimeStamp
for _, updatesPerSelector := range updates {
Expand All @@ -222,13 +223,13 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) {

assert.Equal(t, 2*numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db))

// clear by interimTimeStamp should delete rows inserted before it
err := orm.ClearGasPricesByDestChain(ctx, destSelector, interimTimeStamp)
// clear by sleepSec should delete rows inserted before it
err := orm.ClearGasPricesByDestChain(ctx, destSelector, sleepSec)
assert.NoError(t, err)
assert.Equal(t, numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db))

// clear by Now() should delete all rows
err = orm.ClearGasPricesByDestChain(ctx, destSelector, time.Now())
// clear by 0 expiration seconds should delete all rows
err = orm.ClearGasPricesByDestChain(ctx, destSelector, 0)
assert.NoError(t, err)
assert.Equal(t, 0, getGasTableRowCount(t, db))
}
Expand Down Expand Up @@ -324,7 +325,8 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) {
assert.NoError(t, err)
}

interimTimeStamp := time.Now()
sleepSec := 2
time.Sleep(time.Duration(sleepSec) * time.Second)

// insert for the 2nd time after interimTimeStamp
for _, updatesPerAddr := range updates {
Expand All @@ -334,13 +336,13 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) {

assert.Equal(t, 2*numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db))

// clear by interimTimeStamp should delete rows inserted before it
err := orm.ClearTokenPricesByDestChain(ctx, destSelector, interimTimeStamp)
// clear by sleepSec should delete rows inserted before it
err := orm.ClearTokenPricesByDestChain(ctx, destSelector, sleepSec)
assert.NoError(t, err)
assert.Equal(t, numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db))

// clear by Now() should delete all rows
err = orm.ClearTokenPricesByDestChain(ctx, destSelector, time.Now())
// clear by 0 expiration seconds should delete all rows
err = orm.ClearTokenPricesByDestChain(ctx, destSelector, 0)
assert.NoError(t, err)
assert.Equal(t, 0, getTokenTableRowCount(t, db))
}

0 comments on commit 90de77d

Please sign in to comment.