diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index 9e15920609..f69e7d7f48 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -2,29 +2,10 @@ package history import ( "cmp" - "context" - "database/sql/driver" - "fmt" - "sort" - "strings" - - "github.com/lib/pq" "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/support/db" - "github.com/stellar/go/support/errors" ) -var errSealed = errors.New("cannot register more entries to Loader after calling Exec()") - -// LoaderStats describes the result of executing a history lookup id Loader -type LoaderStats struct { - // Total is the number of elements registered to the Loader - Total int - // Inserted is the number of elements inserted into the lookup table - Inserted int -} - // FutureAccountID represents a future history account. // A FutureAccountID is created by an AccountLoader and // the account id is available after calling Exec() on @@ -38,7 +19,7 @@ type FutureAccountID = future[string, Account] type AccountLoader = loader[string, Account] // NewAccountLoader will construct a new AccountLoader instance. -func NewAccountLoader() *AccountLoader { +func NewAccountLoader(concurrencyMode ConcurrencyMode) *AccountLoader { return &AccountLoader{ sealed: false, set: set.Set[string]{}, @@ -58,287 +39,11 @@ func NewAccountLoader() *AccountLoader { mappingFromRow: func(account Account) (string, int64) { return account.Address, account.ID }, - less: cmp.Less[string], + less: cmp.Less[string], + concurrencyMode: concurrencyMode, } } -type loader[K comparable, T any] struct { - sealed bool - set set.Set[K] - ids map[K]int64 - stats LoaderStats - name string - table string - columnsForKeys func([]K) []columnValues - mappingFromRow func(T) (K, int64) - less func(K, K) bool -} - -type future[K comparable, T any] struct { - key K - loader *loader[K, T] -} - -// Value implements the database/sql/driver Valuer interface. -func (f future[K, T]) Value() (driver.Value, error) { - return f.loader.GetNow(f.key) -} - -// GetFuture registers the given key into the Loader and -// returns a future which will hold the history id for -// the key after Exec() is called. -func (l *loader[K, T]) GetFuture(key K) future[K, T] { - if l.sealed { - panic(errSealed) - } - - l.set.Add(key) - return future[K, T]{ - key: key, - loader: l, - } -} - -// GetNow returns the history id for the given key. -// GetNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any GetNow -// call can succeed. -func (l *loader[K, T]) GetNow(key K) (int64, error) { - if !l.sealed { - return 0, fmt.Errorf(`invalid loader state, - Exec was not called yet to properly seal and resolve %v id`, key) - } - if internalID, ok := l.ids[key]; !ok { - return 0, fmt.Errorf(`loader key %v was not found`, key) - } else { - return internalID, nil - } -} - -// Exec will look up all the history ids for the keys registered in the Loader. -// If there are no history ids for a given set of keys, Exec will insert rows -// into the corresponding history table to establish a mapping between each key and its history id. -func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) error { - l.sealed = true - if len(l.set) == 0 { - return nil - } - q := &Q{session} - keys := make([]K, 0, len(l.set)) - for key := range l.set { - keys = append(keys, key) - } - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Slice(keys, func(i, j int) bool { - return l.less(keys[i], keys[j]) - }) - - if count, err := l.insert(ctx, q, keys); err != nil { - return err - } else { - l.stats.Total += count - l.stats.Inserted += count - } - - if count, err := l.query(ctx, q, keys); err != nil { - return err - } else { - l.stats.Total += count - } - - return nil -} - -// Stats returns the number of addresses registered in the Loader and the number of rows -// inserted into the history table. -func (l *loader[K, T]) Stats() LoaderStats { - return l.stats -} - -func (l *loader[K, T]) Name() string { - return l.name -} - -func (l *loader[K, T]) filter(keys []K) []K { - if len(l.ids) == 0 { - return keys - } - - remaining := make([]K, 0, len(keys)) - for _, key := range keys { - if _, ok := l.ids[key]; ok { - continue - } - remaining = append(remaining, key) - } - return remaining -} - -func (l *loader[K, T]) updateMap(rows []T) { - for _, row := range rows { - key, id := l.mappingFromRow(row) - l.ids[key] = id - } -} - -func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error) { - keys = l.filter(keys) - if len(keys) == 0 { - return 0, nil - } - - var rows []T - err := bulkInsert( - ctx, - q, - l.table, - l.columnsForKeys(keys), - &rows, - ) - if err != nil { - return 0, err - } - - l.updateMap(rows) - return len(rows), nil -} - -func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) { - keys = l.filter(keys) - if len(keys) == 0 { - return 0, nil - } - - var rows []T - err := bulkGet( - ctx, - q, - l.table, - l.columnsForKeys(keys), - &rows, - ) - if err != nil { - return 0, err - } - - l.updateMap(rows) - return len(rows), nil -} - -type columnValues struct { - name string - dbType string - objects []string -} - -func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { - unnestPart := make([]string, 0, len(fields)) - insertFieldsPart := make([]string, 0, len(fields)) - pqArrays := make([]interface{}, 0, len(fields)) - - // In the code below we are building the bulk insert query which looks like: - // - // WITH rows AS - // (SELECT - // /* unnestPart */ - // unnest(?::type1[]), /* field1 */ - // unnest(?::type2[]), /* field2 */ - // ... - // ) - // INSERT INTO table ( - // /* insertFieldsPart */ - // field1, - // field2, - // ... - // ) - // SELECT * FROM rows ON CONFLICT (field1, field2, ...) DO NOTHING RETURNING * - // - // Using unnest allows to get around the maximum limit of 65,535 query parameters, - // see https://www.postgresql.org/docs/12/limits.html and - // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ - // - // Without using unnest we would have to use multiple insert statements to insert - // all the rows for large datasets. - for _, field := range fields { - unnestPart = append( - unnestPart, - fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), - ) - insertFieldsPart = append( - insertFieldsPart, - field.name, - ) - pqArrays = append( - pqArrays, - pq.Array(field.objects), - ) - } - columns := strings.Join(insertFieldsPart, ",") - - sql := ` - WITH rows AS - (SELECT ` + strings.Join(unnestPart, ",") + `) - INSERT INTO ` + table + ` - (` + columns + `) - SELECT * FROM rows - ON CONFLICT (` + columns + `) DO NOTHING - RETURNING *` - - return q.SelectRaw( - ctx, - response, - sql, - pqArrays..., - ) -} - -func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { - unnestPart := make([]string, 0, len(fields)) - columns := make([]string, 0, len(fields)) - pqArrays := make([]interface{}, 0, len(fields)) - - // In the code below we are building the bulk get query which looks like: - // - // SELECT * FROM table WHERE (field1, field2, ...) IN - // (SELECT - // /* unnestPart */ - // unnest(?::type1[]), /* field1 */ - // unnest(?::type2[]), /* field2 */ - // ... - // ) - // - // Using unnest allows to get around the maximum limit of 65,535 query parameters, - // see https://www.postgresql.org/docs/12/limits.html and - // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ - // - // Without using unnest we would have to use multiple select statements to obtain - // all the rows for large datasets. - for _, field := range fields { - unnestPart = append( - unnestPart, - fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), - ) - columns = append( - columns, - field.name, - ) - pqArrays = append( - pqArrays, - pq.Array(field.objects), - ) - } - sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN - (SELECT ` + strings.Join(unnestPart, ",") + `)` - - return q.SelectRaw( - ctx, - response, - sql, - pqArrays..., - ) -} - // AccountLoaderStub is a stub wrapper around AccountLoader which allows // you to manually configure the mapping of addresses to history account ids type AccountLoaderStub struct { @@ -347,7 +52,7 @@ type AccountLoaderStub struct { // NewAccountLoaderStub returns a new AccountLoaderStub instance func NewAccountLoaderStub() AccountLoaderStub { - return AccountLoaderStub{Loader: NewAccountLoader()} + return AccountLoaderStub{Loader: NewAccountLoader(ConcurrentInserts)} } // Insert updates the wrapped AccountLoader so that the given account diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index 9a9fb30445..83b172b40b 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -8,6 +8,7 @@ import ( "github.com/stellar/go/keypair" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" ) func TestAccountLoader(t *testing.T) { @@ -16,12 +17,18 @@ func TestAccountLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testAccountLoader(t, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testAccountLoader(t, session, ConcurrentDeletes) +} + +func testAccountLoader(t *testing.T, session *db.Session, mode ConcurrencyMode) { var addresses []string for i := 0; i < 100; i++ { addresses = append(addresses, keypair.MustRandom().Address()) } - loader := NewAccountLoader() + loader := NewAccountLoader(mode) for _, address := range addresses { future := loader.GetFuture(address) _, err := future.Value() @@ -58,7 +65,7 @@ func TestAccountLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewAccountLoader() + loader = NewAccountLoader(mode) for i := 0; i < 10; i++ { addresses = append(addresses, keypair.MustRandom().Address()) } @@ -85,5 +92,4 @@ func TestAccountLoader(t *testing.T) { assert.Equal(t, account.ID, internalId) assert.Equal(t, account.Address, address) } - } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index cdd2a0d714..33c5c333dd 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -42,7 +42,7 @@ type FutureAssetID = future[AssetKey, Asset] type AssetLoader = loader[AssetKey, Asset] // NewAssetLoader will construct a new AssetLoader instance. -func NewAssetLoader() *AssetLoader { +func NewAssetLoader(concurrencyMode ConcurrencyMode) *AssetLoader { return &AssetLoader{ sealed: false, set: set.Set[AssetKey]{}, @@ -88,6 +88,7 @@ func NewAssetLoader() *AssetLoader { less: func(a AssetKey, b AssetKey) bool { return a.String() < b.String() }, + concurrencyMode: concurrencyMode, } } @@ -99,7 +100,7 @@ type AssetLoaderStub struct { // NewAssetLoaderStub returns a new AssetLoaderStub instance func NewAssetLoaderStub() AssetLoaderStub { - return AssetLoaderStub{Loader: NewAssetLoader()} + return AssetLoaderStub{Loader: NewAssetLoader(ConcurrentInserts)} } // Insert updates the wrapped AssetLoaderStub so that the given asset diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index ca65cebb7e..e7a0495cad 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -9,6 +9,7 @@ import ( "github.com/stellar/go/keypair" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -40,6 +41,12 @@ func TestAssetLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testAssetLoader(t, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testAssetLoader(t, session, ConcurrentDeletes) +} + +func testAssetLoader(t *testing.T, session *db.Session, mode ConcurrencyMode) { var keys []AssetKey for i := 0; i < 100; i++ { var key AssetKey @@ -66,7 +73,7 @@ func TestAssetLoader(t *testing.T) { keys = append(keys, key) } - loader := NewAssetLoader() + loader := NewAssetLoader(mode) for _, key := range keys { future := loader.GetFuture(key) _, err := future.Value() @@ -109,7 +116,7 @@ func TestAssetLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewAssetLoader() + loader = NewAssetLoader(mode) for i := 0; i < 10; i++ { var key AssetKey if i%2 == 0 { diff --git a/services/horizon/internal/db2/history/claimable_balance_loader.go b/services/horizon/internal/db2/history/claimable_balance_loader.go index f775ea4b24..9107d4fb9f 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader.go @@ -19,7 +19,7 @@ type FutureClaimableBalanceID = future[string, HistoryClaimableBalance] type ClaimableBalanceLoader = loader[string, HistoryClaimableBalance] // NewClaimableBalanceLoader will construct a new ClaimableBalanceLoader instance. -func NewClaimableBalanceLoader() *ClaimableBalanceLoader { +func NewClaimableBalanceLoader(concurrencyMode ConcurrencyMode) *ClaimableBalanceLoader { return &ClaimableBalanceLoader{ sealed: false, set: set.Set[string]{}, @@ -39,6 +39,7 @@ func NewClaimableBalanceLoader() *ClaimableBalanceLoader { mappingFromRow: func(row HistoryClaimableBalance) (string, int64) { return row.BalanceID, row.InternalID }, - less: cmp.Less[string], + less: cmp.Less[string], + concurrencyMode: concurrencyMode, } } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index f5759015c7..490d5a0f70 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -17,6 +18,12 @@ func TestClaimableBalanceLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testCBLoader(t, tt, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testCBLoader(t, tt, session, ConcurrentDeletes) +} + +func testCBLoader(t *testing.T, tt *test.T, session *db.Session, mode ConcurrencyMode) { var ids []string for i := 0; i < 100; i++ { balanceID := xdr.ClaimableBalanceId{ @@ -28,7 +35,7 @@ func TestClaimableBalanceLoader(t *testing.T) { ids = append(ids, id) } - loader := NewClaimableBalanceLoader() + loader := NewClaimableBalanceLoader(mode) var futures []FutureClaimableBalanceID for _, id := range ids { future := loader.GetFuture(id) @@ -70,7 +77,7 @@ func TestClaimableBalanceLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewClaimableBalanceLoader() + loader = NewClaimableBalanceLoader(mode) for i := 100; i < 110; i++ { balanceID := xdr.ClaimableBalanceId{ Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, diff --git a/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go b/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go index e1ac998953..e917983b8f 100644 --- a/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go @@ -20,7 +20,7 @@ func TestAddEffect(t *testing.T) { address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) diff --git a/services/horizon/internal/db2/history/effect_test.go b/services/horizon/internal/db2/history/effect_test.go index 19af0ceff8..ba59cf3fd4 100644 --- a/services/horizon/internal/db2/history/effect_test.go +++ b/services/horizon/internal/db2/history/effect_test.go @@ -23,7 +23,7 @@ func TestEffectsForLiquidityPool(t *testing.T) { // Insert Effect address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) @@ -47,7 +47,7 @@ func TestEffectsForLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "abcde" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) operationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder() tt.Assert.NoError(operationBuilder.Add(opID, lpLoader.GetFuture(liquidityPoolID))) @@ -78,7 +78,7 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) { address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index da6563c732..e161d686d1 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -288,7 +288,7 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { details, err = json.Marshal(map[string]interface{}{"new_seq": 98}) tt.Assert.NoError(err) - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) err = effectBuilder.Add( accountLoader.GetFuture(account.Address()), diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index 3d23451937..9fee9513c2 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -5,7 +5,6 @@ import ( "database/sql" "fmt" "strconv" - "strings" sq "github.com/Masterminds/squirrel" @@ -207,41 +206,26 @@ func (q *Q) getValueFromStore(ctx context.Context, key string, forUpdate bool) ( return value, nil } -type KeyValuePair struct { - Key string `db:"key"` - Value string `db:"value"` -} - -func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, error) { - keys := make([]string, 0, len(historyLookupTables)) - for table := range historyLookupTables { - keys = append(keys, table+lookupTableReapOffsetSuffix) - } - offsets := map[string]int64{} - var pairs []KeyValuePair - query := sq.Select("key", "value"). +func (q *Q) getLookupTableReapOffset(ctx context.Context, table string) (int64, error) { + query := sq.Select("value"). From("key_value_store"). Where(map[string]interface{}{ - "key": keys, + "key": table + lookupTableReapOffsetSuffix, }) - err := q.Select(ctx, &pairs, query) + var text string + err := q.Get(ctx, &text, query) if err != nil { - return nil, err - } - for _, pair := range pairs { - table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix) - if _, ok := historyLookupTables[table]; !ok { - return nil, fmt.Errorf("invalid key: %s", pair.Key) - } - - var offset int64 - offset, err = strconv.ParseInt(pair.Value, 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid offset: %s", pair.Value) + if errors.Cause(err) == sql.ErrNoRows { + return 0, nil } - offsets[table] = offset + return 0, err + } + var offset int64 + offset, err = strconv.ParseInt(text, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid offset: %s for table %s", text, table) } - return offsets, err + return offset, nil } func (q *Q) updateLookupTableReapOffset(ctx context.Context, table string, offset int64) error { diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index a03caaa988..5da2a7b6fd 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -19,7 +19,7 @@ type FutureLiquidityPoolID = future[string, HistoryLiquidityPool] type LiquidityPoolLoader = loader[string, HistoryLiquidityPool] // NewLiquidityPoolLoader will construct a new LiquidityPoolLoader instance. -func NewLiquidityPoolLoader() *LiquidityPoolLoader { +func NewLiquidityPoolLoader(concurrencyMode ConcurrencyMode) *LiquidityPoolLoader { return &LiquidityPoolLoader{ sealed: false, set: set.Set[string]{}, @@ -39,7 +39,8 @@ func NewLiquidityPoolLoader() *LiquidityPoolLoader { mappingFromRow: func(row HistoryLiquidityPool) (string, int64) { return row.PoolID, row.InternalID }, - less: cmp.Less[string], + less: cmp.Less[string], + concurrencyMode: concurrencyMode, } } @@ -51,7 +52,7 @@ type LiquidityPoolLoaderStub struct { // NewLiquidityPoolLoaderStub returns a new LiquidityPoolLoader instance func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub { - return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader()} + return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader(ConcurrentInserts)} } // Insert updates the wrapped LiquidityPoolLoader so that the given liquidity pool diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index aec2fcd886..c7a1282760 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -16,6 +17,12 @@ func TestLiquidityPoolLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testLPLoader(t, tt, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testLPLoader(t, tt, session, ConcurrentDeletes) +} + +func testLPLoader(t *testing.T, tt *test.T, session *db.Session, mode ConcurrencyMode) { var ids []string for i := 0; i < 100; i++ { poolID := xdr.PoolId{byte(i)} @@ -24,7 +31,7 @@ func TestLiquidityPoolLoader(t *testing.T) { ids = append(ids, id) } - loader := NewLiquidityPoolLoader() + loader := NewLiquidityPoolLoader(mode) for _, id := range ids { future := loader.GetFuture(id) _, err := future.Value() @@ -62,7 +69,7 @@ func TestLiquidityPoolLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewLiquidityPoolLoader() + loader = NewLiquidityPoolLoader(mode) for i := 100; i < 110; i++ { poolID := xdr.PoolId{byte(i)} var id string diff --git a/services/horizon/internal/db2/history/loader.go b/services/horizon/internal/db2/history/loader.go new file mode 100644 index 0000000000..dc2236accb --- /dev/null +++ b/services/horizon/internal/db2/history/loader.go @@ -0,0 +1,365 @@ +package history + +import ( + "context" + "database/sql/driver" + "fmt" + "sort" + "strings" + + "github.com/lib/pq" + + "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/support/db" +) + +var errSealed = fmt.Errorf("cannot register more entries to Loader after calling Exec()") + +// ConcurrencyMode is used to configure the level of thread-safety for a loader +type ConcurrencyMode int + +func (cm ConcurrencyMode) String() string { + switch cm { + case ConcurrentInserts: + return "ConcurrentInserts" + case ConcurrentDeletes: + return "ConcurrentDeletes" + default: + return "unknown" + } +} + +const ( + _ ConcurrencyMode = iota + // ConcurrentInserts configures the loader to maintain safety when there are multiple loaders + // inserting into the same table concurrently. This ConcurrencyMode is suitable for parallel reingestion. + // Note while ConcurrentInserts is enabled it is not safe to have deletes occurring concurrently on the + // same table. + ConcurrentInserts + // ConcurrentDeletes configures the loader to maintain safety when there is another thread which is invoking + // reapLookupTable() to delete rows from the same table concurrently. This ConcurrencyMode is suitable for + // live ingestion when reaping of lookup tables is enabled. + // Note while ConcurrentDeletes is enabled it is not safe to have multiple threads inserting concurrently to the + // same table. + ConcurrentDeletes +) + +// LoaderStats describes the result of executing a history lookup id Loader +type LoaderStats struct { + // Total is the number of elements registered to the Loader + Total int + // Inserted is the number of elements inserted into the lookup table + Inserted int +} + +type loader[K comparable, T any] struct { + sealed bool + set set.Set[K] + ids map[K]int64 + stats LoaderStats + name string + table string + columnsForKeys func([]K) []columnValues + mappingFromRow func(T) (K, int64) + less func(K, K) bool + concurrencyMode ConcurrencyMode +} + +type future[K comparable, T any] struct { + key K + loader *loader[K, T] +} + +// Value implements the database/sql/driver Valuer interface. +func (f future[K, T]) Value() (driver.Value, error) { + return f.loader.GetNow(f.key) +} + +// GetFuture registers the given key into the Loader and +// returns a future which will hold the history id for +// the key after Exec() is called. +func (l *loader[K, T]) GetFuture(key K) future[K, T] { + if l.sealed { + panic(errSealed) + } + + l.set.Add(key) + return future[K, T]{ + key: key, + loader: l, + } +} + +// GetNow returns the history id for the given key. +// GetNow should only be called on values which were registered by +// GetFuture() calls. Also, Exec() must be called before any GetNow +// call can succeed. +func (l *loader[K, T]) GetNow(key K) (int64, error) { + if !l.sealed { + return 0, fmt.Errorf(`invalid loader state, + Exec was not called yet to properly seal and resolve %v id`, key) + } + if internalID, ok := l.ids[key]; !ok { + return 0, fmt.Errorf(`loader key %v was not found`, key) + } else { + return internalID, nil + } +} + +// Exec will look up all the history ids for the keys registered in the Loader. +// If there are no history ids for a given set of keys, Exec will insert rows +// into the corresponding history table to establish a mapping between each key and its history id. +func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) error { + l.sealed = true + if len(l.set) == 0 { + return nil + } + q := &Q{session} + keys := make([]K, 0, len(l.set)) + for key := range l.set { + keys = append(keys, key) + } + // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock + // https://github.com/stellar/go/issues/2370 + sort.Slice(keys, func(i, j int) bool { + return l.less(keys[i], keys[j]) + }) + + if l.concurrencyMode == ConcurrentInserts { + // if there are other ingestion transactions running concurrently, + // we need to first insert the records (with a ON CONFLICT DO NOTHING + // clause). Then, we can query for the remaining records. + // This order (insert first and then query) is important because + // if multiple concurrent transactions try to insert the same record + // only one of them will succeed and the other transactions will omit + // the record from the RETURNING set. + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } + + if count, err := l.query(ctx, q, keys, false); err != nil { + return err + } else { + l.stats.Total += count + } + } else if l.concurrencyMode == ConcurrentDeletes { + // if the lookup table reaping transaction is running concurrently, + // we need to lock the rows from the lookup table to ensure that + // the reaper cannot run until after the ingestion transaction has + // been committed. + if count, err := l.query(ctx, q, keys, true); err != nil { + return err + } else { + l.stats.Total += count + } + + // insert whatever records were not found from l.query() + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } + } else { + return fmt.Errorf("concurrency mode %v is invalid", l.concurrencyMode) + } + + return nil +} + +// Stats returns the number of addresses registered in the Loader and the number of rows +// inserted into the history table. +func (l *loader[K, T]) Stats() LoaderStats { + return l.stats +} + +func (l *loader[K, T]) Name() string { + return l.name +} + +func (l *loader[K, T]) filter(keys []K) []K { + if len(l.ids) == 0 { + return keys + } + + remaining := make([]K, 0, len(keys)) + for _, key := range keys { + if _, ok := l.ids[key]; ok { + continue + } + remaining = append(remaining, key) + } + return remaining +} + +func (l *loader[K, T]) updateMap(rows []T) { + for _, row := range rows { + key, id := l.mappingFromRow(row) + l.ids[key] = id + } +} + +func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error) { + keys = l.filter(keys) + if len(keys) == 0 { + return 0, nil + } + + var rows []T + err := bulkInsert( + ctx, + q, + l.table, + l.columnsForKeys(keys), + &rows, + ) + if err != nil { + return 0, err + } + + l.updateMap(rows) + return len(rows), nil +} + +func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K, lockRows bool) (int, error) { + keys = l.filter(keys) + if len(keys) == 0 { + return 0, nil + } + var suffix string + if lockRows { + suffix = "ORDER BY id ASC FOR KEY SHARE" + } + + var rows []T + err := bulkGet( + ctx, + q, + l.table, + l.columnsForKeys(keys), + &rows, + suffix, + ) + if err != nil { + return 0, err + } + + l.updateMap(rows) + return len(rows), nil +} + +type columnValues struct { + name string + dbType string + objects []string +} + +func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { + unnestPart := make([]string, 0, len(fields)) + insertFieldsPart := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + // In the code below we are building the bulk insert query which looks like: + // + // WITH rows AS + // (SELECT + // /* unnestPart */ + // unnest(?::type1[]), /* field1 */ + // unnest(?::type2[]), /* field2 */ + // ... + // ) + // INSERT INTO table ( + // /* insertFieldsPart */ + // field1, + // field2, + // ... + // ) + // SELECT * FROM rows ON CONFLICT (field1, field2, ...) DO NOTHING RETURNING * + // + // Using unnest allows to get around the maximum limit of 65,535 query parameters, + // see https://www.postgresql.org/docs/12/limits.html and + // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ + // + // Without using unnest we would have to use multiple insert statements to insert + // all the rows for large datasets. + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + insertFieldsPart = append( + insertFieldsPart, + field.name, + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + columns := strings.Join(insertFieldsPart, ",") + + sql := ` + WITH rows AS + (SELECT ` + strings.Join(unnestPart, ",") + `) + INSERT INTO ` + table + ` + (` + columns + `) + SELECT * FROM rows + ON CONFLICT (` + columns + `) DO NOTHING + RETURNING *` + + return q.SelectRaw( + ctx, + response, + sql, + pqArrays..., + ) +} + +func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}, suffix string) error { + unnestPart := make([]string, 0, len(fields)) + columns := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + // In the code below we are building the bulk get query which looks like: + // + // SELECT * FROM table WHERE (field1, field2, ...) IN + // (SELECT + // /* unnestPart */ + // unnest(?::type1[]), /* field1 */ + // unnest(?::type2[]), /* field2 */ + // ... + // ) + // + // Using unnest allows to get around the maximum limit of 65,535 query parameters, + // see https://www.postgresql.org/docs/12/limits.html and + // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ + // + // Without using unnest we would have to use multiple select statements to obtain + // all the rows for large datasets. + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + columns = append( + columns, + field.name, + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN + (SELECT ` + strings.Join(unnestPart, ",") + `) ` + suffix + + return q.SelectRaw( + ctx, + response, + sql, + pqArrays..., + ) +} diff --git a/services/horizon/internal/db2/history/loader_concurrency_test.go b/services/horizon/internal/db2/history/loader_concurrency_test.go new file mode 100644 index 0000000000..e87d901bd8 --- /dev/null +++ b/services/horizon/internal/db2/history/loader_concurrency_test.go @@ -0,0 +1,197 @@ +package history + +import ( + "context" + "database/sql" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/keypair" + "github.com/stellar/go/services/horizon/internal/test" +) + +func TestLoaderConcurrentInserts(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + s1 := tt.HorizonSession() + s2 := s1.Clone() + + for _, testCase := range []struct { + mode ConcurrencyMode + pass bool + }{ + {ConcurrentInserts, true}, + {ConcurrentDeletes, false}, + } { + t.Run(fmt.Sprintf("%v", testCase.mode), func(t *testing.T) { + var addresses []string + for i := 0; i < 10; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + l1 := NewAccountLoader(testCase.mode) + for _, address := range addresses { + l1.GetFuture(address) + } + + for i := 0; i < 5; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + l2 := NewAccountLoader(testCase.mode) + for _, address := range addresses { + l2.GetFuture(address) + } + + assert.NoError(t, s1.Begin(context.Background())) + assert.NoError(t, l1.Exec(context.Background(), s1)) + + assert.NoError(t, s2.Begin(context.Background())) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + <-time.After(time.Second * 3) + assert.NoError(t, s1.Commit()) + }() + // l2.Exec(context.Background(), s2) will block until s1 + // is committed because s1 and s2 both attempt to insert common + // accounts and, since s1 executed first, s2 must wait until + // s1 terminates. + assert.NoError(t, l2.Exec(context.Background(), s2)) + assert.NoError(t, s2.Commit()) + wg.Wait() + + assert.Equal(t, LoaderStats{ + Total: 10, + Inserted: 10, + }, l1.Stats()) + + if testCase.pass { + assert.Equal(t, LoaderStats{ + Total: 15, + Inserted: 5, + }, l2.Stats()) + } else { + assert.Equal(t, LoaderStats{ + Total: 5, + Inserted: 5, + }, l2.Stats()) + return + } + + q := &Q{s1} + for _, address := range addresses[:10] { + l1Id, err := l1.GetNow(address) + assert.NoError(t, err) + + l2Id, err := l2.GetNow(address) + assert.NoError(t, err) + assert.Equal(t, l1Id, l2Id) + + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, l1Id) + assert.Equal(t, account.Address, address) + } + + for _, address := range addresses[10:] { + l2Id, err := l2.GetNow(address) + assert.NoError(t, err) + + _, err = l1.GetNow(address) + assert.ErrorContains(t, err, "was not found") + + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, l2Id) + assert.Equal(t, account.Address, address) + } + }) + } +} + +func TestLoaderConcurrentDeletes(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + s1 := tt.HorizonSession() + s2 := s1.Clone() + + for _, testCase := range []struct { + mode ConcurrencyMode + pass bool + }{ + {ConcurrentInserts, false}, + {ConcurrentDeletes, true}, + } { + t.Run(fmt.Sprintf("%v", testCase.mode), func(t *testing.T) { + var addresses []string + for i := 0; i < 10; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + loader := NewAccountLoader(testCase.mode) + for _, address := range addresses { + loader.GetFuture(address) + } + assert.NoError(t, loader.Exec(context.Background(), s1)) + + var ids []int64 + for _, address := range addresses { + id, err := loader.GetNow(address) + assert.NoError(t, err) + ids = append(ids, id) + } + + loader = NewAccountLoader(testCase.mode) + for _, address := range addresses { + loader.GetFuture(address) + } + + assert.NoError(t, s1.Begin(context.Background())) + assert.NoError(t, loader.Exec(context.Background(), s1)) + + assert.NoError(t, s2.Begin(context.Background())) + q2 := &Q{s2} + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + <-time.After(time.Second * 3) + + q1 := &Q{s1} + for _, address := range addresses { + id, err := loader.GetNow(address) + assert.NoError(t, err) + + var account Account + err = q1.AccountByAddress(context.Background(), &account, address) + if testCase.pass { + assert.NoError(t, err) + assert.Equal(t, account.ID, id) + assert.Equal(t, account.Address, address) + } else { + assert.ErrorContains(t, err, sql.ErrNoRows.Error()) + } + } + assert.NoError(t, s1.Commit()) + }() + + // the reaper should block until s1 has been committed because s1 has locked + // the orphaned rows + deletedCount, err := q2.reapLookupTable(context.Background(), "history_accounts", ids, 1000) + assert.NoError(t, err) + assert.Equal(t, int64(len(addresses)), deletedCount) + assert.NoError(t, s2.Commit()) + + wg.Wait() + }) + } +} diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index e9d8ffb185..59d828d091 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -9,6 +9,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "strconv" "strings" "sync" "time" @@ -282,7 +283,8 @@ type IngestionQ interface { NewTradeBatchInsertBuilder() TradeBatchInsertBuilder RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error - ReapLookupTables(ctx context.Context, batchSize int) (map[string]LookupTableReapResult, error) + ReapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) + FindLookupTableRowsToReap(ctx context.Context, table string, batchSize int) ([]int64, int64, error) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error) QTransactions QTrustLines @@ -307,6 +309,7 @@ type IngestionQ interface { GetNextLedgerSequence(context.Context, uint32) (uint32, bool, error) TryStateVerificationLock(context.Context) (bool, error) TryReaperLock(context.Context) (bool, error) + TryLookupTableReaperLock(ctx context.Context) (bool, error) ElderLedger(context.Context, interface{}) error } @@ -977,63 +980,70 @@ type LookupTableReapResult struct { Duration time.Duration } -// ReapLookupTables removes rows from lookup tables like history_claimable_balances -// which aren't used (orphaned), i.e. history entries for them were reaped. -// This method must be executed inside ingestion transaction. Otherwise it may -// create invalid state in lookup and history tables. -func (q *Q) ReapLookupTables(ctx context.Context, batchSize int) ( - map[string]LookupTableReapResult, - error, -) { - if q.GetTx() == nil { - return nil, errors.New("cannot be called outside of an ingestion transaction") +func (q *Q) FindLookupTableRowsToReap(ctx context.Context, table string, batchSize int) ([]int64, int64, error) { + offset, err := q.getLookupTableReapOffset(ctx, table) + if err != nil { + return nil, 0, fmt.Errorf("could not obtain offsets: %w", err) } - offsets, err := q.getLookupTableReapOffsets(ctx) + // Find new offset before removing the rows + var newOffset int64 + err = q.GetRaw( + ctx, + &newOffset, + fmt.Sprintf( + "SELECT id FROM %s WHERE id >= %d ORDER BY id ASC LIMIT 1 OFFSET %d", + table, offset, batchSize, + ), + ) if err != nil { - return nil, fmt.Errorf("could not obtain offsets: %w", err) + if q.NoRows(err) { + newOffset = 0 + } else { + return nil, 0, err + } } - results := map[string]LookupTableReapResult{} - for table, historyTables := range historyLookupTables { - startTime := time.Now() - query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) + var ids []int64 + err = q.SelectRaw(ctx, &ids, constructFindReapLookupTablesQuery(table, batchSize, offset)) + if err != nil { + return nil, 0, fmt.Errorf("could not query orphaned rows: %w", err) + } - // Find new offset before removing the rows - var newOffset int64 - err := q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize)) - if err != nil { - if q.NoRows(err) { - newOffset = 0 - } else { - return nil, err - } - } + return ids, newOffset, nil +} - res, err := q.ExecRaw( - context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), - query, - ) - if err != nil { - return nil, errors.Wrapf(err, "error running query: %s", query) - } +func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) { + if err := q.Begin(ctx); err != nil { + return 0, fmt.Errorf("could not start transaction: %w", err) + } + defer q.Rollback() - if err = q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { - return nil, fmt.Errorf("error updating offset: %w", err) - } + rowsDeleted, err := q.reapLookupTable(ctx, table, ids, newOffset) + if err != nil { + return 0, err + } - rows, err := res.RowsAffected() - if err != nil { - return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query) - } + if err := q.Commit(); err != nil { + return 0, fmt.Errorf("could not commit transaction: %w", err) + } + return rowsDeleted, nil +} + +func (q *Q) reapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) { + if err := q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { + return 0, fmt.Errorf("error updating offset for table %s: %w ", table, err) + } - results[table] = LookupTableReapResult{ - Offset: newOffset, - RowsDeleted: rows, - Duration: time.Since(startTime), + var rowsDeleted int64 + if len(ids) > 0 { + var err error + rowsDeleted, err = q.deleteLookupTableRows(ctx, table, ids) + if err != nil { + return 0, fmt.Errorf("could not delete orphaned rows: %w", err) } } - return results, nil + return rowsDeleted, nil } var historyLookupTables = map[string][]tableObjectFieldPair{ @@ -1100,54 +1110,100 @@ var historyLookupTables = map[string][]tableObjectFieldPair{ }, } -// constructReapLookupTablesQuery creates a query like (using history_claimable_balances +func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64) (int64, error) { + deleteQuery := constructDeleteLookupTableRowsQuery(table, ids) + result, err := q.ExecRaw( + context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), + deleteQuery, + ) + if err != nil { + return 0, fmt.Errorf("error running query %s : %w", deleteQuery, err) + } + var deletedCount int64 + deletedCount, err = result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("error getting deleted count: %w", err) + } + return deletedCount, nil +} + +// constructDeleteLookupTableRowsQuery creates a query like (using history_claimable_balances // as an example): // -// delete from history_claimable_balances where id in ( -// -// WITH ha_batch AS ( -// SELECT id -// FROM history_claimable_balances -// WHERE id >= 1000 -// ORDER BY id limit 1000 -// ) SELECT e1.id as id FROM ha_batch e1 +// WITH ha_batch AS ( +// SELECT id +// FROM history_claimable_balances +// WHERE IN ($1, $2, ...) ORDER BY id asc FOR UPDATE +// ) DELETE FROM history_claimable_balances WHERE id IN ( +// SELECT e1.id as id FROM ha_batch e1 // WHERE NOT EXISTS (SELECT 1 FROM history_transaction_claimable_balances WHERE history_transaction_claimable_balances.history_claimable_balance_id = id limit 1) // AND NOT EXISTS (SELECT 1 FROM history_operation_claimable_balances WHERE history_operation_claimable_balances.history_claimable_balance_id = id limit 1) // ) // -// In short it checks the 1000 rows omitting 1000 row of history_claimable_balances +// It checks each of the candidate rows provided in the top level IN clause // and counts occurrences of each row in corresponding history tables. // If there are no history rows for a given id, the row in // history_claimable_balances is removed. // -// The offset param should be increased before each execution. Given that -// some rows will be removed and some will be added by ingestion it's -// possible that rows will be skipped from deletion. But offset is reset -// when it reaches the table size so eventually all orphaned rows are -// deleted. -func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize int, offset int64) string { +// Note that the rows are locked using via SELECT FOR UPDATE. The reason +// for that is to maintain safety when ingestion is running concurrently. +// The ingestion loaders will also lock rows from the history lookup tables +// via SELECT FOR KEY SHARE. This will ensure that the reaping transaction +// will block until the ingestion transaction commits (or vice-versa). +func constructDeleteLookupTableRowsQuery(table string, ids []int64) string { + var conditions []string + for _, referencedTable := range historyLookupTables[table] { + conditions = append( + conditions, + fmt.Sprintf( + "NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)", + referencedTable.name, + referencedTable.name, referencedTable.objectField, + ), + ) + } + + stringIds := make([]string, len(ids)) + for i, id := range ids { + stringIds[i] = strconv.FormatInt(id, 10) + } + innerQuery := fmt.Sprintf( + "SELECT id FROM %s WHERE id IN (%s) ORDER BY id asc FOR UPDATE", + table, + strings.Join(stringIds, ", "), + ) + + deleteQuery := fmt.Sprintf( + "WITH ha_batch AS (%s) DELETE FROM %s WHERE id IN ("+ + "SELECT e1.id as id FROM ha_batch e1 WHERE %s)", + innerQuery, + table, + strings.Join(conditions, " AND "), + ) + return deleteQuery +} + +func constructFindReapLookupTablesQuery(table string, batchSize int, offset int64) string { var conditions []string - for _, historyTable := range historyTables { + for _, referencedTable := range historyLookupTables[table] { conditions = append( conditions, fmt.Sprintf( "NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)", - historyTable.name, - historyTable.name, historyTable.objectField, + referencedTable.name, + referencedTable.name, referencedTable.objectField, ), ) } return fmt.Sprintf( - "DELETE FROM %s WHERE id IN ("+ - "WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+ + "WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id ASC limit %d) "+ "SELECT e1.id as id FROM ha_batch e1 WHERE ", table, - table, offset, batchSize, - ) + strings.Join(conditions, " AND ") + ")" + ) + strings.Join(conditions, " AND ") } // DeleteRangeAll deletes a range of rows from all history tables between diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index 1a28b9e584..a86f4c14a4 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -69,20 +69,34 @@ func TestElderLedger(t *testing.T) { } } +func TestConstructDeleteLookupTableRowsQuery(t *testing.T) { + query := constructDeleteLookupTableRowsQuery( + "history_accounts", + []int64{100, 20, 30}, + ) + + assert.Equal(t, + "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (100, 20, 30) ORDER BY id asc FOR UPDATE) "+ + "DELETE FROM history_accounts WHERE id IN (SELECT e1.id as id FROM ha_batch e1 "+ + "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1))", query) +} + func TestConstructReapLookupTablesQuery(t *testing.T) { - query := constructReapLookupTablesQuery( + query := constructFindReapLookupTablesQuery( "history_accounts", - historyLookupTables["history_accounts"], 10, 0, ) assert.Equal(t, - "DELETE FROM history_accounts WHERE id IN ("+ - "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id limit 10) SELECT e1.id as id FROM ha_batch e1 "+ + "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id ASC limit 10) SELECT e1.id as id FROM ha_batch e1 "+ "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+ - "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1))", query) + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1)", query) } diff --git a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go index 7e823064f2..eb30fe6659 100644 --- a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go @@ -15,7 +15,7 @@ func TestAddOperationParticipants(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) address := keypair.MustRandom().Address() tt.Assert.NoError(q.Begin(tt.Ctx)) builder := q.NewOperationParticipantBatchInsertBuilder() diff --git a/services/horizon/internal/db2/history/operation_test.go b/services/horizon/internal/db2/history/operation_test.go index 1d20a9cb10..8899bfcdf6 100644 --- a/services/horizon/internal/db2/history/operation_test.go +++ b/services/horizon/internal/db2/history/operation_test.go @@ -125,7 +125,7 @@ func TestOperationByLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "a2f38836a839de008cf1d782c81f45e1253cc5d3dad9110b872965484fec0a49" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) lpOperationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder() tt.Assert.NoError(lpOperationBuilder.Add(opID1, lpLoader.GetFuture(liquidityPoolID))) diff --git a/services/horizon/internal/db2/history/participants_test.go b/services/horizon/internal/db2/history/participants_test.go index 37f7654abb..15d09da0ac 100644 --- a/services/horizon/internal/db2/history/participants_test.go +++ b/services/horizon/internal/db2/history/participants_test.go @@ -35,7 +35,7 @@ func TestTransactionParticipantsBatch(t *testing.T) { q := &Q{tt.HorizonSession()} batch := q.NewTransactionParticipantsBatchInsertBuilder() - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) transactionID := int64(1) otherTransactionID := int64(2) diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index 5601cd19b6..af20fbc976 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -1,13 +1,43 @@ package history_test import ( + "context" "testing" + "github.com/stretchr/testify/assert" + "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/ingest" "github.com/stellar/go/services/horizon/internal/test" ) +type reapResult struct { + Offset int64 + RowsDeleted int64 +} + +func reapLookupTables(t *testing.T, q *history.Q, batchSize int) map[string]reapResult { + results := map[string]reapResult{} + + for _, table := range []string{ + "history_accounts", + "history_assets", + "history_claimable_balances", + "history_liquidity_pools", + } { + ids, offset, err := q.FindLookupTableRowsToReap(context.Background(), table, batchSize) + assert.NoError(t, err) + rowsDeleted, err := q.ReapLookupTable(context.Background(), table, ids, offset) + assert.NoError(t, err) + results[table] = reapResult{ + Offset: offset, + RowsDeleted: rowsDeleted, + } + } + + return results +} + func TestReapLookupTables(t *testing.T) { tt := test.Start(t) defer tt.Finish() @@ -46,14 +76,7 @@ func TestReapLookupTables(t *testing.T) { q := &history.Q{tt.HorizonSession()} - err = q.Begin(tt.Ctx) - tt.Require.NoError(err) - - results, err := q.ReapLookupTables(tt.Ctx, 5) - tt.Require.NoError(err) - - err = q.Commit() - tt.Require.NoError(err) + results := reapLookupTables(t, q, 5) err = db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) tt.Require.NoError(err) @@ -91,14 +114,7 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) - err = q.Begin(tt.Ctx) - tt.Require.NoError(err) - - results, err = q.ReapLookupTables(tt.Ctx, 5) - tt.Require.NoError(err) - - err = q.Commit() - tt.Require.NoError(err) + results = reapLookupTables(t, q, 5) err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) tt.Require.NoError(err) @@ -121,14 +137,7 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) - err = q.Begin(tt.Ctx) - tt.Require.NoError(err) - - results, err = q.ReapLookupTables(tt.Ctx, 1000) - tt.Require.NoError(err) - - err = q.Commit() - tt.Require.NoError(err) + results = reapLookupTables(t, q, 1000) err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) tt.Require.NoError(err) diff --git a/services/horizon/internal/db2/history/transaction_test.go b/services/horizon/internal/db2/history/transaction_test.go index 65c6734644..bd8cb1673c 100644 --- a/services/horizon/internal/db2/history/transaction_test.go +++ b/services/horizon/internal/db2/history/transaction_test.go @@ -79,7 +79,7 @@ func TestTransactionByLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "a2f38836a839de008cf1d782c81f45e1253cc5d3dad9110b872965484fec0a49" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) lpTransactionBuilder := q.NewTransactionLiquidityPoolBatchInsertBuilder() tt.Assert.NoError(lpTransactionBuilder.Add(txID, lpLoader.GetFuture(liquidityPoolID))) tt.Assert.NoError(lpLoader.Exec(tt.Ctx, q)) @@ -940,15 +940,15 @@ func TestTransactionQueryBuilder(t *testing.T) { tt.Assert.NoError(q.Begin(tt.Ctx)) address := "GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) accountLoader.GetFuture(address) cbID := "00000000178826fbfe339e1f5c53417c6fedfe2c05e8bec14303143ec46b38981b09c3f9" - cbLoader := NewClaimableBalanceLoader() + cbLoader := NewClaimableBalanceLoader(ConcurrentInserts) cbLoader.GetFuture(cbID) lpID := "0000a8198b5e25994c1ca5b0556faeb27325ac746296944144e0a7406d501e8a" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) lpLoader.GetFuture(lpID) tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q)) diff --git a/services/horizon/internal/db2/history/verify_lock.go b/services/horizon/internal/db2/history/verify_lock.go index 29bc11a473..4d7d1fbde7 100644 --- a/services/horizon/internal/db2/history/verify_lock.go +++ b/services/horizon/internal/db2/history/verify_lock.go @@ -13,9 +13,13 @@ const ( // all ingesting nodes use the same value which is why it's hard coded here.`1 stateVerificationLockId = 73897213 // reaperLockId is the objid for the advisory lock acquired during - // reaping. The value is arbitrary. The only requirement is that + // reaping of history tables. The value is arbitrary. The only requirement is that // all ingesting nodes use the same value which is why it's hard coded here. reaperLockId = 944670730 + // lookupTableReaperLockId is the objid for the advisory lock acquired during + // reaping of lookup tables. The value is arbitrary. The only requirement is that + // all ingesting nodes use the same value which is why it's hard coded here. + lookupTableReaperLockId = 329518896 ) // TryStateVerificationLock attempts to acquire the state verification lock @@ -34,6 +38,10 @@ func (q *Q) TryReaperLock(ctx context.Context) (bool, error) { return q.tryAdvisoryLock(ctx, reaperLockId) } +func (q *Q) TryLookupTableReaperLock(ctx context.Context) (bool, error) { + return q.tryAdvisoryLock(ctx, lookupTableReaperLockId) +} + func (q *Q) tryAdvisoryLock(ctx context.Context, lockId int) (bool, error) { if tx := q.GetTx(); tx == nil { return false, errors.New("cannot be called outside of a transaction") diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 64e4558723..63ee7ba457 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -70,16 +70,15 @@ const ( // * Ledger ingestion, // * State verifications, // * Metrics updates. - // * Reaping (requires 2 connections, the extra connection is used for holding the advisory lock) - MaxDBConnections = 5 + // * Reaping of history (requires 2 connections, the extra connection is used for holding the advisory lock) + // * Reaping of lookup tables (requires 2 connections, the extra connection is used for holding the advisory lock) + MaxDBConnections = 7 stateVerificationErrorThreshold = 3 // 100 ledgers per flush has shown in stress tests // to be best point on performance curve, default to that. MaxLedgersPerFlush uint32 = 100 - - reapLookupTablesBatchSize = 1000 ) var log = logpkg.DefaultLogger.WithField("service", "ingest") @@ -172,9 +171,6 @@ type Metrics struct { // duration of rebuilding trade aggregation buckets. LedgerIngestionTradeAggregationDuration prometheus.Summary - ReapDurationByLookupTable *prometheus.SummaryVec - RowsReapedByLookupTable *prometheus.SummaryVec - // StateVerifyDuration exposes timing metrics about the rate and // duration of state verification. StateVerifyDuration prometheus.Summary @@ -256,7 +252,8 @@ type system struct { maxLedgerPerFlush uint32 - reaper *Reaper + reaper *Reaper + lookupTableReaper *lookupTableReaper currentStateMutex sync.Mutex currentState State @@ -369,6 +366,7 @@ func NewSystem(config Config) (System, error) { config.ReapConfig, config.HistorySession, ), + lookupTableReaper: newLookupTableReaper(config.HistorySession), } system.initMetrics() @@ -409,18 +407,6 @@ func (s *system) initMetrics() { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) - s.metrics.ReapDurationByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds", - Help: "reap lookup tables durations, sliding window = 10m", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, []string{"table"}) - - s.metrics.RowsReapedByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped", - Help: "rows deleted during lookup tables reap, sliding window = 10m", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, []string{"table"}) - s.metrics.StateVerifyDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: "horizon", Subsystem: "ingest", Name: "state_verify_duration_seconds", Help: "state verification durations, sliding window = 10m", @@ -538,8 +524,6 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(s.metrics.LocalLatestLedger) registry.MustRegister(s.metrics.LedgerIngestionDuration) registry.MustRegister(s.metrics.LedgerIngestionTradeAggregationDuration) - registry.MustRegister(s.metrics.ReapDurationByLookupTable) - registry.MustRegister(s.metrics.RowsReapedByLookupTable) registry.MustRegister(s.metrics.StateVerifyDuration) registry.MustRegister(s.metrics.StateInvalidGauge) registry.MustRegister(s.metrics.LedgerStatsCounter) @@ -552,6 +536,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(s.metrics.IngestionErrorCounter) s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon") s.reaper.RegisterMetrics(registry) + s.lookupTableReaper.RegisterMetrics(registry) } // Run starts ingestion system. Ingestion system supports distributed ingestion @@ -822,55 +807,11 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { return } - err = s.historyQ.Begin(s.ctx) - if err != nil { - log.WithError(err).Error("Error starting a transaction") - return - } - defer s.historyQ.Rollback() - - // If so block ingestion in the cluster to reap tables - _, err = s.historyQ.GetLastLedgerIngest(s.ctx) - if err != nil { - log.WithError(err).Error(getLastIngestedErrMsg) - return - } - - // Make sure reaping will not take more than 5s, which is average ledger - // closing time. - ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) - defer cancel() - - reapStart := time.Now() - results, err := s.historyQ.ReapLookupTables(ctx, reapLookupTablesBatchSize) - if err != nil { - log.WithError(err).Warn("Error reaping lookup tables") - return - } - - err = s.historyQ.Commit() - if err != nil { - log.WithError(err).Error("Error committing a transaction") - return - } - - totalDeleted := int64(0) - reapLog := log - for table, result := range results { - totalDeleted += result.RowsDeleted - reapLog = reapLog.WithField(table+"_offset", result.Offset) - reapLog = reapLog.WithField(table+"_duration", result.Duration) - reapLog = reapLog.WithField(table+"_rows_deleted", result.RowsDeleted) - s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds()) - } - - if totalDeleted > 0 { - reapLog.Info("Reaper deleted rows from lookup tables") - } - - s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(float64(totalDeleted)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(time.Since(reapStart).Seconds()) + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.lookupTableReaper.deleteOrphanedRows(s.ctx) + }() } func (s *system) incrementStateVerificationErrors() int { diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index d5733ee5e4..470039fd92 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -462,6 +462,11 @@ func (m *mockDBQ) TryReaperLock(ctx context.Context) (bool, error) { return args.Get(0).(bool), args.Error(1) } +func (m *mockDBQ) TryLookupTableReaperLock(ctx context.Context) (bool, error) { + args := m.Called(ctx) + return args.Get(0).(bool), args.Error(1) +} + func (m *mockDBQ) GetNextLedgerSequence(ctx context.Context, start uint32) (uint32, bool, error) { args := m.Called(ctx, start) return args.Get(0).(uint32), args.Get(1).(bool), args.Error(2) @@ -562,13 +567,14 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder { return args.Get(0).(history.TradeBatchInsertBuilder) } -func (m *mockDBQ) ReapLookupTables(ctx context.Context, batchSize int) (map[string]history.LookupTableReapResult, error) { - args := m.Called(ctx, batchSize) - var r1 map[string]history.LookupTableReapResult - if args.Get(0) != nil { - r1 = args.Get(0).(map[string]history.LookupTableReapResult) - } - return r1, args.Error(2) +func (m *mockDBQ) FindLookupTableRowsToReap(ctx context.Context, table string, batchSize int) ([]int64, int64, error) { + args := m.Called(ctx, table, batchSize) + return args.Get(0).([]int64), args.Get(1).(int64), args.Error(2) +} + +func (m *mockDBQ) ReapLookupTable(ctx context.Context, table string, ids []int64, offset int64) (int64, error) { + args := m.Called(ctx, table, ids, offset) + return args.Get(0).(int64), args.Error(1) } func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error { diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index e6f0e0cf74..75b8645953 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -133,11 +133,11 @@ func buildChangeProcessor( }) } -func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor) (groupLoaders, *groupTransactionProcessors) { - accountLoader := history.NewAccountLoader() - assetLoader := history.NewAssetLoader() - lpLoader := history.NewLiquidityPoolLoader() - cbLoader := history.NewClaimableBalanceLoader() +func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor, concurrencyMode history.ConcurrencyMode) (groupLoaders, *groupTransactionProcessors) { + accountLoader := history.NewAccountLoader(concurrencyMode) + assetLoader := history.NewAssetLoader(concurrencyMode) + lpLoader := history.NewLiquidityPoolLoader(concurrencyMode) + cbLoader := history.NewClaimableBalanceLoader(concurrencyMode) loaders := newGroupLoaders([]horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader}) statsLedgerTransactionProcessor := processors.NewStatsLedgerTransactionProcessor() @@ -366,7 +366,7 @@ func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta, return nil } -func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry, ledger xdr.LedgerCloseMeta) ( +func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry, ledger xdr.LedgerCloseMeta, concurrencyMode history.ConcurrencyMode) ( transactionStats processors.StatsLedgerTransactionProcessorResults, transactionDurations runDurations, tradeStats processors.TradeStats, @@ -381,7 +381,7 @@ func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry groupTransactionFilterers := s.buildTransactionFilterer() // when in online mode, the submission result processor must always run (regardless of whether filter rules exist or not) groupFilteredOutProcessors := s.buildFilteredOutProcessor() - loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor, concurrencyMode) if err = registerTransactionProcessors( registry, @@ -494,7 +494,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.Ledger groupTransactionFilterers := s.buildTransactionFilterer() // intentionally skip filtered out processor groupFilteredOutProcessors := newGroupTransactionProcessors(nil, nil, nil) - loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor, history.ConcurrentInserts) startTime := time.Now() curHeap, sysHeap := getMemStats() @@ -611,7 +611,7 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( return } - transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.runTransactionProcessorsOnLedger(registry, ledger) + transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.runTransactionProcessorsOnLedger(registry, ledger, history.ConcurrentDeletes) stats.changeStats = changeStatsProcessor.GetResults() stats.changeDurations = groupChangeProcessors.processorsRunDurations diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index e6ce6b512c..82c712b737 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -248,7 +248,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { ledgersProcessor := &processors.LedgersProcessor{} - _, processor := runner.buildTransactionProcessor(ledgersProcessor) + _, processor := runner.buildTransactionProcessor(ledgersProcessor, history.ConcurrentInserts) assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &processors.StatsLedgerTransactionProcessor{}, processor.processors[0]) assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1]) diff --git a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go index ca918e08ea..5967ef41b1 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go @@ -44,7 +44,7 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) SetupTest() { }, }, } - s.cbLoader = history.NewClaimableBalanceLoader() + s.cbLoader = history.NewClaimableBalanceLoader(history.ConcurrentInserts) s.processor = NewClaimableBalancesTransactionProcessor( s.cbLoader, diff --git a/services/horizon/internal/ingest/processors/effects_processor_test.go b/services/horizon/internal/ingest/processors/effects_processor_test.go index 0243768fde..276f6fcb03 100644 --- a/services/horizon/internal/ingest/processors/effects_processor_test.go +++ b/services/horizon/internal/ingest/processors/effects_processor_test.go @@ -7,11 +7,11 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" + "math/big" + "strings" "testing" "github.com/guregu/null" - "math/big" - "strings" "github.com/stellar/go/keypair" "github.com/stellar/go/protocols/horizon/base" @@ -62,7 +62,7 @@ func TestEffectsProcessorTestSuiteLedger(t *testing.T) { func (s *EffectsProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() - s.accountLoader = history.NewAccountLoader() + s.accountLoader = history.NewAccountLoader(history.ConcurrentInserts) s.mockBatchInsertBuilder = &history.MockEffectBatchInsertBuilder{} s.lcm = xdr.LedgerCloseMeta{ @@ -446,7 +446,7 @@ func TestEffectsCoversAllOperationTypes(t *testing.T) { } assert.True(t, err2 != nil || err == nil, s) }() - err = operation.ingestEffects(history.NewAccountLoader(), &history.MockEffectBatchInsertBuilder{}) + err = operation.ingestEffects(history.NewAccountLoader(history.ConcurrentInserts), &history.MockEffectBatchInsertBuilder{}) }() } @@ -468,7 +468,7 @@ func TestEffectsCoversAllOperationTypes(t *testing.T) { ledgerSequence: 1, } // calling effects should error due to the unknown operation - err := operation.ingestEffects(history.NewAccountLoader(), &history.MockEffectBatchInsertBuilder{}) + err := operation.ingestEffects(history.NewAccountLoader(history.ConcurrentInserts), &history.MockEffectBatchInsertBuilder{}) assert.Contains(t, err.Error(), "Unknown operation type") } @@ -2558,7 +2558,7 @@ type effect struct { } func assertIngestEffects(t *testing.T, operation transactionOperationWrapper, expected []effect) { - accountLoader := history.NewAccountLoader() + accountLoader := history.NewAccountLoader(history.ConcurrentInserts) mockBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} for _, expectedEffect := range expected { diff --git a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go index 8d08e44d44..cdafc5bcc3 100644 --- a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go @@ -44,7 +44,7 @@ func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) SetupTest() { }, }, } - s.lpLoader = history.NewLiquidityPoolLoader() + s.lpLoader = history.NewLiquidityPoolLoader(history.ConcurrentInserts) s.processor = NewLiquidityPoolsTransactionProcessor( s.lpLoader, diff --git a/services/horizon/internal/ingest/processors/participants_processor_test.go b/services/horizon/internal/ingest/processors/participants_processor_test.go index b81bd22f67..f6154b2b39 100644 --- a/services/horizon/internal/ingest/processors/participants_processor_test.go +++ b/services/horizon/internal/ingest/processors/participants_processor_test.go @@ -86,7 +86,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) SetupTest() { s.thirdTx.Envelope.V1.Tx.SourceAccount = aid.ToMuxedAccount() s.thirdTxID = toid.New(int32(sequence), 3, 0).ToInt64() - s.accountLoader = history.NewAccountLoader() + s.accountLoader = history.NewAccountLoader(history.ConcurrentInserts) s.addressToFuture = map[string]history.FutureAccountID{} for _, address := range s.addresses { s.addressToFuture[address] = s.accountLoader.GetFuture(address) diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index 07a61a4cde..63b56de993 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -16,6 +16,8 @@ import ( "github.com/stellar/go/toid" ) +const reapLookupTablesBatchSize = 1000 + // Reaper represents the history reaping subsystem of horizon. type Reaper struct { historyQ history.IngestionQ @@ -243,3 +245,117 @@ func (r *Reaper) deleteBatch(ctx context.Context, batchStartSeq, batchEndSeq uin r.deleteBatchDuration.Observe(elapsedSeconds) return count, nil } + +type lookupTableReaper struct { + historyQ history.IngestionQ + reapLockQ history.IngestionQ + pending atomic.Bool + logger *logpkg.Entry + + reapDurationByLookupTable *prometheus.SummaryVec + rowsReapedByLookupTable *prometheus.SummaryVec +} + +func newLookupTableReaper(dbSession db.SessionInterface) *lookupTableReaper { + return &lookupTableReaper{ + historyQ: &history.Q{dbSession.Clone()}, + reapLockQ: &history.Q{dbSession.Clone()}, + pending: atomic.Bool{}, + logger: log.WithField("subservice", "lookuptable-reaper"), + reapDurationByLookupTable: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds", + Help: "reap lookup tables durations, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"table", "type"}), + rowsReapedByLookupTable: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped", + Help: "rows deleted during lookup tables reap, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"table"}), + } +} + +func (r *lookupTableReaper) RegisterMetrics(registry *prometheus.Registry) { + registry.MustRegister( + r.reapDurationByLookupTable, + r.rowsReapedByLookupTable, + ) +} + +func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { + // check if reap is already in progress on this horizon node + if !r.pending.CompareAndSwap(false, true) { + r.logger.Infof("existing reap already in progress, skipping request to start a new one") + return nil + } + defer r.pending.Store(false) + + if err := r.reapLockQ.Begin(ctx); err != nil { + return errors.Wrap(err, "error while starting reaper lock transaction") + } + defer func() { + if err := r.reapLockQ.Rollback(); err != nil { + r.logger.WithField("error", err).Error("failed to release reaper lock") + } + }() + // check if reap is already in progress on another horizon node + if acquired, err := r.reapLockQ.TryLookupTableReaperLock(ctx); err != nil { + return errors.Wrap(err, "error while acquiring reaper database lock") + } else if !acquired { + r.logger.Info("reap already in progress on another node") + return nil + } + + reapStart := time.Now() + var totalQueryDuration, totalDeleteDuration time.Duration + var totalDeleted int64 + for _, table := range []string{ + "history_accounts", "history_claimable_balances", + "history_assets", "history_liquidity_pools", + } { + startTime := time.Now() + ids, offset, err := r.historyQ.FindLookupTableRowsToReap(ctx, table, reapLookupTablesBatchSize) + if err != nil { + r.logger.WithField("table", table).WithError(err).Warn("Error finding orphaned rows") + return err + } + queryDuration := time.Since(startTime) + totalQueryDuration += queryDuration + + deleteStartTime := time.Now() + var rowsDeleted int64 + rowsDeleted, err = r.historyQ.ReapLookupTable(ctx, table, ids, offset) + if err != nil { + r.logger.WithField("table", table).WithError(err).Warn("Error deleting orphaned rows") + return err + } + deleteDuration := time.Since(deleteStartTime) + totalDeleteDuration += deleteDuration + + r.rowsReapedByLookupTable.With(prometheus.Labels{"table": table}). + Observe(float64(rowsDeleted)) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "query"}). + Observe(float64(queryDuration.Seconds())) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "delete"}). + Observe(float64(deleteDuration.Seconds())) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}). + Observe(float64((queryDuration + deleteDuration).Seconds())) + + r.logger.WithField("table", table). + WithField("offset", offset). + WithField("rows_deleted", rowsDeleted). + WithField("query_duration", queryDuration.Seconds()). + WithField("delete_duration", deleteDuration.Seconds()). + Info("Reaper deleted rows from lookup tables") + } + + r.rowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}). + Observe(float64(totalDeleted)) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "query"}). + Observe(float64(totalQueryDuration.Seconds())) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "delete"}). + Observe(float64(totalDeleteDuration.Seconds())) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "total"}). + Observe(time.Since(reapStart).Seconds()) + return nil +} diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index feb5e13bb0..534ec555f6 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -402,10 +402,6 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { } func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { - s.system.config.ReapLookupTables = true - defer func() { - s.system.config.ReapLookupTables = false - }() s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() @@ -425,12 +421,6 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() - // Reap lookup tables: - s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(101), nil) - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() - s.historyQ.On("ReapLookupTables", mock.AnythingOfType("*context.timerCtx"), mock.Anything).Return(nil, nil, errors.New("error reaping objects")).Once() - s.historyQ.On("Rollback").Return(nil).Once() mockStats := &historyarchive.MockArchiveStats{} mockStats.On("GetBackendName").Return("name") mockStats.On("GetDownloads").Return(uint32(0))