Skip to content

Commit

Permalink
Optimize query for reaping lookup tables
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Jul 18, 2024
1 parent 0e83215 commit be19207
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 76 deletions.
110 changes: 51 additions & 59 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,11 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
results := map[string]LookupTableReapResult{}
for table, historyTables := range map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_transaction_participants",
objectField: "history_account_id",
},

{
name: "history_effects",
objectField: "history_account_id",
Expand All @@ -1010,10 +1015,6 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
name: "history_trades",
objectField: "counter_account_id",
},
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
},
"history_assets": {
{
Expand All @@ -1035,34 +1036,31 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
},
"history_claimable_balances": {
{
name: "history_operation_claimable_balances",
name: "history_transaction_claimable_balances",
objectField: "history_claimable_balance_id",
},
{
name: "history_transaction_claimable_balances",
name: "history_operation_claimable_balances",
objectField: "history_claimable_balance_id",
},
},
"history_liquidity_pools": {
{
name: "history_operation_liquidity_pools",
name: "history_transaction_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
{
name: "history_transaction_liquidity_pools",
name: "history_operation_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
},
} {
startTime := time.Now()
query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
if err != nil {
return nil, errors.Wrap(err, "error constructing a query")
}
query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])

// Find new offset before removing the rows
var newOffset int64
err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))
err := q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))

Check failure on line 1063 in services/horizon/internal/db2/history/main.go

View workflow job for this annotation

GitHub Actions / golangci

q.GetRaw undefined (type Q has no field or method GetRaw) (typecheck)
if err != nil {
if q.NoRows(err) {
newOffset = 0
Expand Down Expand Up @@ -1098,17 +1096,24 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
//
// delete from history_claimable_balances where id in
//
// (select id from
// (select id,
// (select 1 from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c1,
// (select 1 from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c2,
// 1 as cx,
// from history_claimable_balances hcb where id > 1000 order by id limit 100)
// as sub where c1 IS NULL and c2 IS NULL and 1=1);
// (SELECT e1.id FROM (
// SELECT id FROM history_claimable_balances
// WHERE id >= 1000
// ORDER BY id LIMIT 1000
// ) e1 LEFT JOIN LATERAL (
// SELECT 1 AS row
// FROM history_transaction_claimable_balances
// where history_transaction_claimable_balances.history_claimable_balance_id = e1.id
// LIMIT 1
// ) e2 ON true LEFT JOIN LATERAL (
// SELECT 1 AS row
// FROM history_operation_claimable_balances
// where history_operation_claimable_balances.history_claimable_balance_id = e1.id
// LIMIT 1
// ) e3 ON true
// WHERE e2.row IS NULL AND e3.row IS NULL);
//
// In short it checks the 100 rows omitting 1000 row of history_claimable_balances
// In short it checks the 1000 rows omitting 1000 row of history_claimable_balances
// and counts occurrences of each row in corresponding history tables.
// If there are no history rows for a given id, the row in
// history_claimable_balances is removed.
Expand All @@ -1118,45 +1123,32 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
// possible that rows will be skipped from deletion. But offset is reset
// when it reaches the table size so eventually all orphaned rows are
// deleted.
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) (string, error) {
var sb strings.Builder
var err error
_, err = fmt.Fprintf(&sb, "delete from %s where id IN (select id from (select id, ", table)
if err != nil {
return "", err
}

for i, historyTable := range historyTables {
_, err = fmt.Fprintf(
&sb,
`(select 1 from %s where %s = hcb.id limit 1) as c%d, `,
historyTable.name,
historyTable.objectField,
i,
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) string {
index := 2
var joins []string
var conditions []string

for _, historyTable := range historyTables {
joins = append(
joins,
fmt.Sprintf(
` LEFT JOIN LATERAL ( SELECT 1 as row FROM %s WHERE %s.%s = e1.id LIMIT 1) e%d ON true`,
historyTable.name,
historyTable.name, historyTable.objectField,
index,
),
)
if err != nil {
return "", err
}
}

_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize)
if err != nil {
return "", err
conditions = append(conditions, fmt.Sprintf("e%d.row IS NULL", index))
index++
}

for i := range historyTables {
_, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i)
if err != nil {
return "", err
}
}

_, err = sb.WriteString("1=1);")
if err != nil {
return "", err
}

return sb.String(), nil
return fmt.Sprintf(
"DELETE FROM %s WHERE id IN (SELECT e1.id FROM (SELECT id FROM %s WHERE id >= %d ORDER BY id LIMIT %d) e1",
table,
table,
offset,
batchSize,
) + strings.Join(joins, "") + fmt.Sprintf(" WHERE %s);", strings.Join(conditions, " AND "))
}

// DeleteRangeAll deletes a range of rows from all history tables between
Expand Down
39 changes: 22 additions & 17 deletions services/horizon/internal/db2/history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"testing"
"time"

"github.com/stellar/go/services/horizon/internal/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/stellar/go/services/horizon/internal/test"
)

func TestLatestLedger(t *testing.T) {
Expand Down Expand Up @@ -70,9 +70,13 @@ func TestElderLedger(t *testing.T) {
}

func TestConstructReapLookupTablesQuery(t *testing.T) {
query, err := constructReapLookupTablesQuery(
query := constructReapLookupTablesQuery(
"history_accounts",
[]tableObjectFieldPair{
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
{
name: "history_effects",
objectField: "history_account_id",
Expand All @@ -89,24 +93,25 @@ func TestConstructReapLookupTablesQuery(t *testing.T) {
name: "history_trades",
objectField: "counter_account_id",
},
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
},
10,
0,
)

require.NoError(t, err)
assert.Equal(t,
"delete from history_accounts where id IN "+
"(select id from "+
"(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+
"(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+
"(select 1 from history_trades where base_account_id = hcb.id limit 1) as c2, "+
"(select 1 from history_trades where counter_account_id = hcb.id limit 1) as c3, "+
"(select 1 from history_transaction_participants where history_account_id = hcb.id limit 1) as c4, "+
"1 as cx from history_accounts hcb where id >= 0 order by id limit 10) as sub "+
"where c0 IS NULL and c1 IS NULL and c2 IS NULL and c3 IS NULL and c4 IS NULL and 1=1);", query)
"DELETE FROM history_accounts WHERE id IN ("+
"SELECT e1.id FROM ("+
"SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id LIMIT 10) e1 "+
"LEFT JOIN LATERAL ( "+
"SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = e1.id LIMIT 1"+
") e2 ON true LEFT JOIN LATERAL ( "+
"SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = e1.id LIMIT 1"+
") e3 ON true LEFT JOIN LATERAL ( "+
"SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = e1.id LIMIT 1"+
") e4 ON true LEFT JOIN LATERAL ( "+
"SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = e1.id LIMIT 1"+
") e5 ON true LEFT JOIN LATERAL ( "+
"SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = e1.id LIMIT 1"+
") e6 ON true "+
"WHERE e2.row IS NULL AND e3.row IS NULL AND e4.row IS NULL AND e5.row IS NULL AND e6.row IS NULL);", query)
}

0 comments on commit be19207

Please sign in to comment.