Skip to content

Commit

Permalink
fix caching
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Sep 20, 2023
1 parent 3bea2a0 commit d0d3901
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 20 deletions.
2 changes: 1 addition & 1 deletion accountresolver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (p *Processor) manageAddressLookup(ctx context.Context, blockNum uint64, er
func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx *pbsol.ConfirmedTransaction) error {
start := time.Now()
for _, addressTableLookup := range trx.Transaction.Message.AddressTableLookups {
accs, _, err := p.accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey)
accs, err := p.accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey)
if err != nil {
return fmt.Errorf("resolving address table %s at block %d: %w", base58.Encode(addressTableLookup.AccountKey), blockNum, err)
}
Expand Down
15 changes: 11 additions & 4 deletions accountresolver/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func Test_ExtendTableLookupInCompiledInstruction(t *testing.T) {
err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAccount)
accounts, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAccount)
require.Equal(t, expectedCreatedAccounts, accounts)
}

Expand Down Expand Up @@ -150,7 +150,7 @@ func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) {
err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 157_564_921, tableLookupAccount)
accounts, err := resolver.Resolve(context.Background(), 157_564_921, tableLookupAccount)
require.Equal(t, expectedCreatedAccounts, accounts)
}

Expand Down Expand Up @@ -228,10 +228,17 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

accounts := NewAccounts(solBlock.Transactions[0].Transaction.Message.AccountKeys)
require.Equal(t, 2, len(accounts))

err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAddressToExtend)
accounts = NewAccounts(solBlock.Transactions[0].Transaction.Message.AccountKeys)
require.Equal(t, 3, len(accounts))
require.Equal(t, accounts[2], AddressTableLookupAccountProgram)

accounts, err = resolver.Resolve(context.Background(), 185_914_862, tableLookupAddressToExtend)
require.Equal(t, expectedCreatedAccounts, accounts)
}

Expand Down Expand Up @@ -317,6 +324,6 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa
err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableAccountToExtend)
accounts, err := resolver.Resolve(context.Background(), 185_914_862, tableAccountToExtend)
require.Equal(t, expectedCreatedAccounts, accounts)
}
32 changes: 26 additions & 6 deletions accountresolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,25 @@ import (

type AccountsResolver interface {
Extend(ctx context.Context, blockNum uint64, trxHash []byte, key Account, accounts Accounts) error
Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, uint64, error)
Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, error)
StoreCursor(ctx context.Context, readerName string, cursor *Cursor) error
GetCursor(ctx context.Context, readerName string) (*Cursor, error)
}

type cacheItem struct {
blockNum uint64
accounts Accounts
}

type KVDBAccountsResolver struct {
store store.KVStore
cache map[string][]*cacheItem
}

func NewKVDBAccountsResolver(store store.KVStore) *KVDBAccountsResolver {
return &KVDBAccountsResolver{
store: store,
cache: make(map[string][]*cacheItem),
}
}

Expand All @@ -31,7 +38,7 @@ func (r *KVDBAccountsResolver) Extend(ctx context.Context, blockNum uint64, trxH
return nil
}

currentAccounts, _, err := r.Resolve(ctx, blockNum, key)
currentAccounts, err := r.Resolve(ctx, blockNum, key)
if err != nil {
return fmt.Errorf("retreiving last accounts for key %q: %w", key, err)
}
Expand All @@ -51,25 +58,38 @@ func (r *KVDBAccountsResolver) Extend(ctx context.Context, blockNum uint64, trxH
return fmt.Errorf("flushing extended accounts for key %q: %w", key, err)
}

r.cache[key.base58()] = append([]*cacheItem{{
blockNum: blockNum,
accounts: extendedAccount,
}}, r.cache[key.base58()]...)

return nil
}

func (r *KVDBAccountsResolver) Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, uint64, error) {
func (r *KVDBAccountsResolver) Resolve(ctx context.Context, atBlockNum uint64, key Account) (Accounts, error) {
if cacheItems, ok := r.cache[key.base58()]; ok {
for _, cacheItem := range cacheItems {
if cacheItem.blockNum <= atBlockNum {
return cacheItem.accounts, nil
}
}
return nil, nil
}
keyBytes := Keys.tableLookupPrefix(key)
iter := r.store.Prefix(ctx, keyBytes, store.Unlimited)
if iter.Err() != nil {
return nil, 0, fmt.Errorf("querying accounts for key %q: %w", key, iter.Err())
return nil, fmt.Errorf("querying accounts for key %q: %w", key, iter.Err())
}

for iter.Next() {
item := iter.Item()
_, keyBlockNum := Keys.unpackTableLookup(item.Key)
if keyBlockNum <= atBlockNum {
return decodeAccounts(item.Value), keyBlockNum, nil
return decodeAccounts(item.Value), nil
}
}

return nil, 0, nil
return nil, nil
}

func (r *KVDBAccountsResolver) isKnownTransaction(ctx context.Context, transactionHash []byte) bool {
Expand Down
12 changes: 6 additions & 6 deletions accountresolver/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestKVDBAccountsResolver_Extended(t *testing.T) {
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
accounts, err := resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
require.NoError(t, err)
require.Equal(t, 2, len(accounts))
require.Equal(t, accountFromBase58(t, a2), accounts[0])
Expand All @@ -40,20 +40,20 @@ func TestKVDBAccountsResolver_Extended(t *testing.T) {
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

accounts, _, err = resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
accounts, err = resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
require.NoError(t, err)
require.Equal(t, 2, len(accounts))
require.Equal(t, accountFromBase58(t, a2), accounts[0])
require.Equal(t, accountFromBase58(t, a3), accounts[1])

accounts, _, err = resolver.Resolve(context.Background(), 100, accountFromBase58(t, a1))
accounts, err = resolver.Resolve(context.Background(), 100, accountFromBase58(t, a1))
require.NoError(t, err)
require.Equal(t, 3, len(accounts))
require.Equal(t, accountFromBase58(t, a2), accounts[0])
require.Equal(t, accountFromBase58(t, a3), accounts[1])
require.Equal(t, accountFromBase58(t, a4), accounts[2])

accounts, _, err = resolver.Resolve(context.Background(), 1000, accountFromBase58(t, a1))
accounts, err = resolver.Resolve(context.Background(), 1000, accountFromBase58(t, a1))
require.NoError(t, err)
require.Equal(t, 3, len(accounts))
require.Equal(t, accountFromBase58(t, a2), accounts[0])
Expand Down Expand Up @@ -116,7 +116,7 @@ func Test_Extend_Multiple_Accounts_Same_Block(t *testing.T) {
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
accounts, err := resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
require.NoError(t, err)
require.Equal(t, 2, len(accounts))
require.Equal(t, accountFromBase58(t, a2), accounts[0])
Expand All @@ -134,7 +134,7 @@ func Test_Extend_Multiple_Accounts_Same_Block(t *testing.T) {
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

accounts, _, err = resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
accounts, err = resolver.Resolve(context.Background(), 1, accountFromBase58(t, a1))
require.NoError(t, err)
require.Equal(t, 4, len(accounts))
require.Equal(t, accountFromBase58(t, a2), accounts[0])
Expand Down
5 changes: 2 additions & 3 deletions cmd/firesol/tablelookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package main
import (
"fmt"

firecore "github.com/streamingfast/firehose-core"
pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
accountsresolver "github.com/streamingfast/firehose-solana/accountresolver"
pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
kvstore "github.com/streamingfast/kvdb/store"
_ "github.com/streamingfast/kvdb/store/badger3"
_ "github.com/streamingfast/kvdb/store/bigkv"
Expand Down

0 comments on commit d0d3901

Please sign in to comment.