Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/db2/history: Insert and query rows from history lookup tables with one query #5415

Merged
merged 15 commits into from
Aug 23, 2024
86 changes: 29 additions & 57 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
)

// FutureAccountID represents a future history account.
Expand All @@ -24,8 +23,6 @@ type FutureAccountID struct {
loader *AccountLoader
}

const loaderLookupBatchSize = 50000
tamirms marked this conversation as resolved.
Show resolved Hide resolved

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address)
Expand Down Expand Up @@ -85,28 +82,10 @@ func (a *AccountLoader) GetNow(address string) (int64, error) {
}
}

func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error {
for i := 0; i < len(addresses); i += loaderLookupBatchSize {
end := ordered.Min(len(addresses), i+loaderLookupBatchSize)

var accounts []Account
if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil {
return errors.Wrap(err, "could not select accounts")
}

for _, account := range accounts {
a.ids[account.Address] = account.ID
}
}
return nil
}

// LoaderStats describes the result of executing a history lookup id loader
type LoaderStats struct {
// Total is the number of elements registered to the loader
Total int
// Inserted is the number of elements inserted into the lookup table
Inserted int
}

// Exec will look up all the history account ids for the addresses registered in the loader.
Expand All @@ -122,47 +101,32 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
for address := range a.set {
addresses = append(addresses, address)
}

if err := a.lookupKeys(ctx, q, addresses); err != nil {
tamirms marked this conversation as resolved.
Show resolved Hide resolved
return err
}
a.stats.Total += len(addresses)

insert := 0
for _, address := range addresses {
if _, ok := a.ids[address]; ok {
continue
}
addresses[insert] = address
insert++
}
if insert == 0 {
return nil
}
addresses = addresses[:insert]
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(addresses)

err := bulkInsert(
var accounts []Account
err := bulkGetOrCreate(
ctx,
q,
"history_accounts",
[]string{"address"},
[]bulkInsertField{
[]columnValues{
{
name: "address",
dbType: "character varying(64)",
objects: addresses,
},
},
&accounts,
)
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, addresses)
for _, account := range accounts {
a.ids[account.Address] = account.ID
}
a.stats.Total += len(accounts)
return nil
}

// Stats returns the number of addresses registered in the loader and the number of addresses
Expand All @@ -175,13 +139,13 @@ func (a *AccountLoader) Name() string {
return "AccountLoader"
}

type bulkInsertField struct {
type columnValues struct {
name string
dbType string
objects []string
}

func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error {
func bulkGetOrCreate(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error {
unnestPart := make([]string, 0, len(fields))
insertFieldsPart := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))
Expand All @@ -201,20 +165,28 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string
)
}

columns := strings.Join(insertFieldsPart, ",")
tamirms marked this conversation as resolved.
Show resolved Hide resolved
sql := `
WITH r AS
(SELECT ` + strings.Join(unnestPart, ",") + `)
INSERT INTO ` + table + `
(` + strings.Join(insertFieldsPart, ",") + `)
SELECT * from r
ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING`

_, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType),
WITH rows AS
(SELECT ` + strings.Join(unnestPart, ",") + `),
inserted_rows AS (
INSERT INTO ` + table + `
(` + columns + `)
SELECT * FROM rows
ON CONFLICT (` + columns + `) DO NOTHING
RETURNING *
)
SELECT * FROM inserted_rows
tamirms marked this conversation as resolved.
Show resolved Hide resolved
UNION ALL
SELECT * FROM ` + table + ` WHERE (` + columns + `) IN
(SELECT * FROM rows)`

return q.SelectRaw(
ctx,
response,
sql,
pqArrays...,
)
return err
}

// AccountLoaderStub is a stub wrapper around AccountLoader which allows
Expand Down
28 changes: 26 additions & 2 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ func TestAccountLoader(t *testing.T) {
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
Total: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture(keypair.MustRandom().Address())
Expand All @@ -55,4 +54,29 @@ func TestAccountLoader(t *testing.T) {
_, err = loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that loader works when all the values are already present in the db
tamirms marked this conversation as resolved.
Show resolved Hide resolved
loader = NewAccountLoader()
for _, address := range addresses {
future := loader.GetFuture(address)
_, err = future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
}

assert.NoError(t, loader.Exec(context.Background(), session))
assert.Equal(t, LoaderStats{
Total: 100,
}, loader.Stats())

for _, address := range addresses {
var internalId int64
internalId, err = loader.GetNow(address)
assert.NoError(t, err)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

}
82 changes: 21 additions & 61 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -101,37 +99,6 @@ func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) {
}
}

func (a *AssetLoader) lookupKeys(ctx context.Context, q *Q, keys []AssetKey) error {
var rows []Asset
for i := 0; i < len(keys); i += loaderLookupBatchSize {
end := ordered.Min(len(keys), i+loaderLookupBatchSize)
subset := keys[i:end]
args := make([]interface{}, 0, 3*len(subset))
placeHolders := make([]string, 0, len(subset))
for _, key := range subset {
args = append(args, key.Code, key.Type, key.Issuer)
placeHolders = append(placeHolders, "(?, ?, ?)")
}
rawSQL := fmt.Sprintf(
"SELECT * FROM history_assets WHERE (asset_code, asset_type, asset_issuer) in (%s)",
strings.Join(placeHolders, ", "),
)
err := q.SelectRaw(ctx, &rows, rawSQL, args...)
if err != nil {
return errors.Wrap(err, "could not select assets")
}

for _, row := range rows {
a.ids[AssetKey{
Type: row.Type,
Code: row.Code,
Issuer: row.Issuer,
}] = row.ID
}
}
return nil
}

// Exec will look up all the history asset ids for the assets registered in the loader.
// If there are no history asset ids for a given set of assets, Exec will insert rows
// into the history_assets table.
Expand All @@ -146,64 +113,57 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
keys = append(keys, key)
}

if err := a.lookupKeys(ctx, q, keys); err != nil {
return err
}
a.stats.Total += len(keys)

assetTypes := make([]string, 0, len(a.set)-len(a.ids))
assetCodes := make([]string, 0, len(a.set)-len(a.ids))
assetIssuers := make([]string, 0, len(a.set)-len(a.ids))
assetTypes := make([]string, 0, len(keys))
assetCodes := make([]string, 0, len(keys))
assetIssuers := make([]string, 0, len(keys))
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Slice(keys, func(i, j int) bool {
return keys[i].String() < keys[j].String()
})
insert := 0
for _, key := range keys {
if _, ok := a.ids[key]; ok {
continue
}
assetTypes = append(assetTypes, key.Type)
assetCodes = append(assetCodes, key.Code)
assetIssuers = append(assetIssuers, key.Issuer)
keys[insert] = key
insert++
}
if insert == 0 {
return nil
}
keys = keys[:insert]

err := bulkInsert(
var rows []Asset
err := bulkGetOrCreate(
ctx,
q,
"history_assets",
[]string{"asset_code", "asset_type", "asset_issuer"},
[]bulkInsertField{
[]columnValues{
{
name: "asset_code",
dbType: "character varying(12)",
objects: assetCodes,
},
{
name: "asset_issuer",
dbType: "character varying(56)",
objects: assetIssuers,
},
{
name: "asset_type",
dbType: "character varying(64)",
objects: assetTypes,
},
{
name: "asset_issuer",
dbType: "character varying(56)",
objects: assetIssuers,
},
},
&rows,
)
if err != nil {
return err
}
a.stats.Inserted += insert
for _, row := range rows {
a.ids[AssetKey{
Type: row.Type,
Code: row.Code,
Issuer: row.Issuer,
}] = row.ID
}
a.stats.Total += len(keys)

return a.lookupKeys(ctx, q, keys)
return nil
}

// Stats returns the number of assets registered in the loader and the number of assets
Expand Down
32 changes: 30 additions & 2 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func TestAssetLoader(t *testing.T) {
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
Total: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture(AssetKey{Type: "invalid"})
Expand All @@ -106,4 +105,33 @@ func TestAssetLoader(t *testing.T) {
_, err = loader.GetNow(AssetKey{})
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that loader works when all the values are already present in the db
loader = NewAssetLoader()
for _, key := range keys {
future := loader.GetFuture(key)
_, err = future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid asset loader state,`)
}
assert.NoError(t, loader.Exec(context.Background(), session))
assert.Equal(t, LoaderStats{
Total: 100,
}, loader.Stats())

for _, key := range keys {
var internalID int64
internalID, err = loader.GetNow(key)
assert.NoError(t, err)
var assetXDR xdr.Asset
if key.Type == "native" {
assetXDR = xdr.MustNewNativeAsset()
} else {
assetXDR = xdr.MustNewCreditAsset(key.Code, key.Issuer)
}
var assetID int64
assetID, err = q.GetAssetID(context.Background(), assetXDR)
assert.NoError(t, err)
assert.Equal(t, assetID, internalID)
}
}
Loading
Loading