From d0d3901c0a5f9f8b6498024f291b4e0686ee68cb Mon Sep 17 00:00:00 2001 From: billettc Date: Wed, 20 Sep 2023 08:29:06 -0400 Subject: [PATCH] fix caching --- accountresolver/processor.go | 2 +- accountresolver/processor_test.go | 15 +++++++++++---- accountresolver/resolver.go | 32 +++++++++++++++++++++++++------ accountresolver/resolver_test.go | 12 ++++++------ cmd/firesol/tablelookup.go | 5 ++--- 5 files changed, 46 insertions(+), 20 deletions(-) diff --git a/accountresolver/processor.go b/accountresolver/processor.go index ac7969c0..76938470 100644 --- a/accountresolver/processor.go +++ b/accountresolver/processor.go @@ -271,7 +271,7 @@ func (p *Processor) manageAddressLookup(ctx context.Context, blockNum uint64, er func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx *pbsol.ConfirmedTransaction) error { start := time.Now() for _, addressTableLookup := range trx.Transaction.Message.AddressTableLookups { - accs, _, err := p.accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey) + accs, err := p.accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey) if err != nil { return fmt.Errorf("resolving address table %s at block %d: %w", base58.Encode(addressTableLookup.AccountKey), blockNum, err) } diff --git a/accountresolver/processor_test.go b/accountresolver/processor_test.go index 79d6b414..9ef557b0 100644 --- a/accountresolver/processor_test.go +++ b/accountresolver/processor_test.go @@ -71,7 +71,7 @@ func Test_ExtendTableLookupInCompiledInstruction(t *testing.T) { err = p.ProcessBlock(context.Background(), solBlock) require.NoError(t, err) - accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAccount) + accounts, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAccount) require.Equal(t, expectedCreatedAccounts, accounts) } @@ -150,7 +150,7 @@ func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) { err = p.ProcessBlock(context.Background(), solBlock) require.NoError(t, err) - accounts, _, err := resolver.Resolve(context.Background(), 157_564_921, tableLookupAccount) + accounts, err := resolver.Resolve(context.Background(), 157_564_921, tableLookupAccount) require.Equal(t, expectedCreatedAccounts, accounts) } @@ -228,10 +228,17 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku err = resolver.store.FlushPuts(context.Background()) require.NoError(t, err) + accounts := NewAccounts(solBlock.Transactions[0].Transaction.Message.AccountKeys) + require.Equal(t, 2, len(accounts)) + err = p.ProcessBlock(context.Background(), solBlock) require.NoError(t, err) - accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAddressToExtend) + accounts = NewAccounts(solBlock.Transactions[0].Transaction.Message.AccountKeys) + require.Equal(t, 3, len(accounts)) + require.Equal(t, accounts[2], AddressTableLookupAccountProgram) + + accounts, err = resolver.Resolve(context.Background(), 185_914_862, tableLookupAddressToExtend) require.Equal(t, expectedCreatedAccounts, accounts) } @@ -317,6 +324,6 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa err = p.ProcessBlock(context.Background(), solBlock) require.NoError(t, err) - accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableAccountToExtend) + accounts, err := resolver.Resolve(context.Background(), 185_914_862, tableAccountToExtend) require.Equal(t, expectedCreatedAccounts, accounts) } diff --git a/accountresolver/resolver.go b/accountresolver/resolver.go index 070d315f..ac6d3c46 100644 --- a/accountresolver/resolver.go +++ b/accountresolver/resolver.go @@ -11,18 +11,25 @@ import ( type AccountsResolver interface { Extend(ctx context.Context, blockNum uint64, trxHash []byte, key Account, accounts Accounts) error - Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, uint64, error) + Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, error) StoreCursor(ctx context.Context, readerName string, cursor *Cursor) error GetCursor(ctx context.Context, readerName string) (*Cursor, error) } +type cacheItem struct { + blockNum uint64 + accounts Accounts +} + type KVDBAccountsResolver struct { store store.KVStore + cache map[string][]*cacheItem } func NewKVDBAccountsResolver(store store.KVStore) *KVDBAccountsResolver { return &KVDBAccountsResolver{ store: store, + cache: make(map[string][]*cacheItem), } } @@ -31,7 +38,7 @@ func (r *KVDBAccountsResolver) Extend(ctx context.Context, blockNum uint64, trxH return nil } - currentAccounts, _, err := r.Resolve(ctx, blockNum, key) + currentAccounts, err := r.Resolve(ctx, blockNum, key) if err != nil { return fmt.Errorf("retreiving last accounts for key %q: %w", key, err) } @@ -51,25 +58,38 @@ func (r *KVDBAccountsResolver) Extend(ctx context.Context, blockNum uint64, trxH return fmt.Errorf("flushing extended accounts for key %q: %w", key, err) } + r.cache[key.base58()] = append([]*cacheItem{{ + blockNum: blockNum, + accounts: extendedAccount, + }}, r.cache[key.base58()]...) + return nil } -func (r *KVDBAccountsResolver) Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, uint64, error) { +func (r *KVDBAccountsResolver) Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, error) { + if cacheItems, ok := r.cache[key.base58()]; ok { + for _, cacheItem := range cacheItems { + if cacheItem.blockNum <= atBlockNum { + return cacheItem.accounts, nil + } + } + return nil, nil + } keyBytes := Keys.tableLookupPrefix(key) iter := r.store.Prefix(ctx, keyBytes, store.Unlimited) if iter.Err() != nil { - return nil, 0, fmt.Errorf("querying accounts for key %q: %w", key, iter.Err()) + return nil, fmt.Errorf("querying accounts for key %q: %w", key, iter.Err()) } for iter.Next() { item := iter.Item() _, keyBlockNum := Keys.unpackTableLookup(item.Key) if keyBlockNum <= atBlockNum { - return decodeAccounts(item.Value), keyBlockNum, nil + return decodeAccounts(item.Value), nil } } - return nil, 0, nil + return nil, nil } func (r *KVDBAccountsResolver) isKnownTransaction(ctx context.Context, transactionHash []byte) bool { diff --git a/accountresolver/resolver_test.go b/accountresolver/resolver_test.go index 87d25793..79ab9714 100644 --- a/accountresolver/resolver_test.go +++ b/accountresolver/resolver_test.go @@ -29,7 +29,7 @@ func TestKVDBAccountsResolver_Extended(t *testing.T) { err = resolver.store.FlushPuts(context.Background()) require.NoError(t, err) - accounts, _, err := resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1)) + accounts, err := resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1)) require.NoError(t, err) require.Equal(t, 2, len(accounts)) require.Equal(t, accountFromBase58(t, a2), accounts[0]) @@ -40,20 +40,20 @@ func TestKVDBAccountsResolver_Extended(t *testing.T) { err = resolver.store.FlushPuts(context.Background()) require.NoError(t, err) - accounts, _, err = resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1)) + accounts, err = resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1)) require.NoError(t, err) require.Equal(t, 2, len(accounts)) require.Equal(t, accountFromBase58(t, a2), accounts[0]) require.Equal(t, accountFromBase58(t, a3), accounts[1]) - accounts, _, err = resolver.Resolve(context.Background(), 100, accountFromBase58(t, a1)) + accounts, err = resolver.Resolve(context.Background(), 100, accountFromBase58(t, a1)) require.NoError(t, err) require.Equal(t, 3, len(accounts)) require.Equal(t, accountFromBase58(t, a2), accounts[0]) require.Equal(t, accountFromBase58(t, a3), accounts[1]) require.Equal(t, accountFromBase58(t, a4), accounts[2]) - accounts, _, err = resolver.Resolve(context.Background(), 1000, accountFromBase58(t, a1)) + accounts, err = resolver.Resolve(context.Background(), 1000, accountFromBase58(t, a1)) require.NoError(t, err) require.Equal(t, 3, len(accounts)) require.Equal(t, accountFromBase58(t, a2), accounts[0]) @@ -116,7 +116,7 @@ func Test_Extend_Multiple_Accounts_Same_Block(t *testing.T) { err = resolver.store.FlushPuts(context.Background()) require.NoError(t, err) - accounts, _, err := resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1)) + accounts, err := resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1)) require.NoError(t, err) require.Equal(t, 2, len(accounts)) require.Equal(t, accountFromBase58(t, a2), accounts[0]) @@ -134,7 +134,7 @@ func Test_Extend_Multiple_Accounts_Same_Block(t *testing.T) { err = resolver.store.FlushPuts(context.Background()) require.NoError(t, err) - accounts, _, err = resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1)) + accounts, err = resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1)) require.NoError(t, err) require.Equal(t, 4, len(accounts)) require.Equal(t, accountFromBase58(t, a2), accounts[0]) diff --git a/cmd/firesol/tablelookup.go b/cmd/firesol/tablelookup.go index df661dba..76f90df0 100644 --- a/cmd/firesol/tablelookup.go +++ b/cmd/firesol/tablelookup.go @@ -3,13 +3,12 @@ package main import ( "fmt" - firecore "github.com/streamingfast/firehose-core" - pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1" - "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/streamingfast/dstore" + firecore "github.com/streamingfast/firehose-core" accountsresolver "github.com/streamingfast/firehose-solana/accountresolver" + pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1" kvstore "github.com/streamingfast/kvdb/store" _ "github.com/streamingfast/kvdb/store/badger3" _ "github.com/streamingfast/kvdb/store/bigkv"