From e9527c429ccb3d49d769610844e914b01d5b13df Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 14 Jan 2025 20:22:35 +0100 Subject: [PATCH] Encoded log collector happy path test --- pkg/solana/logpoller/job_get_block.go | 4 + pkg/solana/logpoller/job_get_block_test.go | 11 +- pkg/solana/logpoller/loader.go | 2 - pkg/solana/logpoller/loader_test.go | 814 +++++---------------- pkg/solana/logpoller/mocks/rpc_client.go | 120 --- 5 files changed, 195 insertions(+), 756 deletions(-) diff --git a/pkg/solana/logpoller/job_get_block.go b/pkg/solana/logpoller/job_get_block.go index 75beb86a7..59745cd5c 100644 --- a/pkg/solana/logpoller/job_get_block.go +++ b/pkg/solana/logpoller/job_get_block.go @@ -2,6 +2,7 @@ package logpoller import ( "context" + "errors" "fmt" "github.com/gagliardetto/solana-go" @@ -75,6 +76,9 @@ func (j *getBlockJob) Run(ctx context.Context) error { events := make([]ProgramEvent, 0, len(block.Transactions)) for idx, txWithMeta := range block.Transactions { detail.trxIdx = idx + if txWithMeta.Transaction == nil { + return fmt.Errorf("failed to parse transaction %d in slot %d: %w", idx, j.slotNumber, errors.New("missing transaction field")) + } tx, err := txWithMeta.GetTransaction() if err != nil { return fmt.Errorf("failed to parse transaction %d in slot %d: %w", idx, j.slotNumber, err) diff --git a/pkg/solana/logpoller/job_get_block_test.go b/pkg/solana/logpoller/job_get_block_test.go index 742b7bcf2..af0c13fb4 100644 --- a/pkg/solana/logpoller/job_get_block_test.go +++ b/pkg/solana/logpoller/job_get_block_test.go @@ -34,6 +34,15 @@ func TestGetBlockJob(t *testing.T) { err := job.Run(tests.Context(t)) require.ErrorIs(t, err, expectedError) }) + t.Run("Error if transaction field is not present", func(t *testing.T) { + client := mocks.NewRPCClient(t) + lggr := logger.Sugared(logger.Test(t)) + block := rpc.GetBlockResult{Transactions: []rpc.TransactionWithMeta{{Transaction: nil}}} + client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() + job := newGetBlockJob(client, make(chan Block), lggr, slotNumber) + err := job.Run(tests.Context(t)) + require.ErrorContains(t, err, "failed to parse transaction 0 in slot 42: missing transaction field") + }) t.Run("Error if fails to get transaction", func(t *testing.T) { client := mocks.NewRPCClient(t) lggr := logger.Sugared(logger.Test(t)) @@ -100,7 +109,7 @@ func TestGetBlockJob(t *testing.T) { txWithMeta1 := rpc.TransactionWithMeta{Transaction: txSigToDataBytes(tx1Signature), Meta: &rpc.TransactionMeta{LogMessages: []string{"log1", "log2"}}} txWithMeta2 := rpc.TransactionWithMeta{Transaction: txSigToDataBytes(tx2Signature), Meta: &rpc.TransactionMeta{LogMessages: []string{"log3"}}} // tx3 must be ignored due to error - txWithMeta3 := rpc.TransactionWithMeta{Transaction: txSigToDataBytes(tx2Signature), Meta: &rpc.TransactionMeta{LogMessages: []string{"log4"}, Err: fmt.Errorf("some error")}} + txWithMeta3 := rpc.TransactionWithMeta{Transaction: txSigToDataBytes(solana.Signature{10, 11}), Meta: &rpc.TransactionMeta{LogMessages: []string{"log4"}, Err: fmt.Errorf("some error")}} height := uint64(41) block := rpc.GetBlockResult{BlockHeight: &height, Blockhash: solana.Hash{1, 2, 3}, Transactions: []rpc.TransactionWithMeta{txWithMeta1, txWithMeta2, txWithMeta3}} client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once() diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index 7bb704624..886d5c2ea 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -31,8 +31,6 @@ type ProgramEventProcessor interface { } type RPCClient interface { - GetLatestBlockhash(ctx context.Context, commitment rpc.CommitmentType) (out *rpc.GetLatestBlockhashResult, err error) - GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64, commitment rpc.CommitmentType) (out rpc.BlocksResult, err error) GetBlockWithOpts(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) GetSignaturesForAddressWithOpts(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) GetSlot(ctx context.Context, commitment rpc.CommitmentType) (uint64, error) diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index ed7ffacc2..46a438ca4 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -1,635 +1,183 @@ package logpoller_test -// -//import ( -// "context" -// "crypto/rand" -// "reflect" -// "sync" -// "sync/atomic" -// "testing" -// "time" -// -// "github.com/gagliardetto/solana-go" -// "github.com/gagliardetto/solana-go/rpc" -// "github.com/stretchr/testify/assert" -// "github.com/stretchr/testify/mock" -// "github.com/stretchr/testify/require" -// -// "github.com/smartcontractkit/chainlink-common/pkg/logger" -// "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" -// -// "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" -// "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks" -//) -// -//var ( -// messages = []string{ -// "Program J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4 invoke [1]", -// "Program log: Instruction: CreateLog", -// "Program data: HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", -// "Program J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4 consumed 1477 of 200000 compute units", -// "Program J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4 success", -// } -//) -// -//func TestEncodedLogCollector_StartClose(t *testing.T) { -// t.Parallel() -// -// client := new(mocks.RPCClient) -// ctx := tests.Context(t) -// -// collector := logpoller.NewEncodedLogCollector(client, logger.Nop()) -// -// assert.NoError(t, collector.Start(ctx)) -// assert.NoError(t, collector.Close()) -//} -// -//func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { -// t.Parallel() -// -// client := new(mocks.RPCClient) -// parser := new(testParser) -// ctx := tests.Context(t) -// -// collector := logpoller.NewEncodedLogCollector(client, logger.Nop()) -// -// require.NoError(t, collector.Start(ctx)) -// t.Cleanup(func() { -// require.NoError(t, collector.Close()) -// }) -// -// var latest atomic.Uint64 -// -// latest.Store(uint64(40)) -// -// client.EXPECT(). -// GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). -// RunAndReturn(latestBlockhashReturnFunc(&latest)) -// -// client.EXPECT(). -// GetBlocks( -// mock.Anything, -// mock.MatchedBy(getBlocksStartValMatcher), -// mock.MatchedBy(getBlocksEndValMatcher(&latest)), -// rpc.CommitmentFinalized, -// ). -// RunAndReturn(getBlocksReturnFunc(false)) -// -// client.EXPECT(). -// GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). -// RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { -// height := slot - 1 -// -// result := rpc.GetBlockResult{ -// Transactions: []rpc.TransactionWithMeta{}, -// Signatures: []solana.Signature{}, -// BlockHeight: &height, -// } -// -// _, _ = rand.Read(result.Blockhash[:]) -// -// if slot == 42 { -// var sig solana.Signature -// _, _ = rand.Read(sig[:]) -// -// result.Signatures = []solana.Signature{sig} -// result.Transactions = []rpc.TransactionWithMeta{ -// { -// Meta: &rpc.TransactionMeta{ -// LogMessages: messages, -// }, -// }, -// } -// } -// -// return &result, nil -// }) -// -// tests.AssertEventually(t, func() bool { -// return parser.Called() -// }) -//} -// -//func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { -// t.Parallel() -// -// client := new(mocks.RPCClient) -// parser := new(testParser) -// ctx := tests.Context(t) -// -// collector := logpoller.NewEncodedLogCollector(client, parser, logger.Nop()) -// -// require.NoError(t, collector.Start(ctx)) -// t.Cleanup(func() { -// require.NoError(t, collector.Close()) -// }) -// -// var latest atomic.Uint64 -// -// latest.Store(uint64(40)) -// -// slots := []uint64{44, 43, 42, 41} -// sigs := make([]solana.Signature, len(slots)) -// hashes := make([]solana.Hash, len(slots)) -// scrambler := &slotUnsync{ch: make(chan struct{})} -// -// for idx := range len(sigs) { -// _, _ = rand.Read(sigs[idx][:]) -// _, _ = rand.Read(hashes[idx][:]) -// } -// -// client.EXPECT(). -// GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). -// RunAndReturn(latestBlockhashReturnFunc(&latest)) -// -// client.EXPECT(). -// GetBlocks( -// mock.Anything, -// mock.MatchedBy(getBlocksStartValMatcher), -// mock.MatchedBy(getBlocksEndValMatcher(&latest)), -// rpc.CommitmentFinalized, -// ). -// RunAndReturn(getBlocksReturnFunc(false)) -// -// client.EXPECT(). -// GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). -// RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { -// slotIdx := -1 -// for idx, slt := range slots { -// if slt == slot { -// slotIdx = idx -// -// break -// } -// } -// -// // imitate loading block data out of order -// // every other block must wait for the block previous -// scrambler.next() -// -// height := slot - 1 -// -// if slotIdx == -1 { -// var hash solana.Hash -// _, _ = rand.Read(hash[:]) -// -// return &rpc.GetBlockResult{ -// Blockhash: hash, -// Transactions: []rpc.TransactionWithMeta{}, -// Signatures: []solana.Signature{}, -// BlockHeight: &height, -// }, nil -// } -// -// return &rpc.GetBlockResult{ -// Blockhash: hashes[slotIdx], -// Transactions: []rpc.TransactionWithMeta{ -// { -// Meta: &rpc.TransactionMeta{ -// LogMessages: messages, -// }, -// }, -// }, -// Signatures: []solana.Signature{sigs[slotIdx]}, -// BlockHeight: &height, -// }, nil -// }) -// -// tests.AssertEventually(t, func() bool { -// return reflect.DeepEqual(parser.Events(), []logpoller.ProgramEvent{ -// { -// BlockData: logpoller.BlockData{ -// SlotNumber: 41, -// BlockHeight: 40, -// BlockHash: hashes[3], -// TransactionHash: sigs[3], -// TransactionIndex: 0, -// TransactionLogIndex: 0, -// }, -// Prefix: ">", -// Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", -// }, -// { -// BlockData: logpoller.BlockData{ -// SlotNumber: 42, -// BlockHeight: 41, -// BlockHash: hashes[2], -// TransactionHash: sigs[2], -// TransactionIndex: 0, -// TransactionLogIndex: 0, -// }, -// Prefix: ">", -// Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", -// }, -// { -// BlockData: logpoller.BlockData{ -// SlotNumber: 43, -// BlockHeight: 42, -// BlockHash: hashes[1], -// TransactionHash: sigs[1], -// TransactionIndex: 0, -// TransactionLogIndex: 0, -// }, -// Prefix: ">", -// Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", -// }, -// { -// BlockData: logpoller.BlockData{ -// SlotNumber: 44, -// BlockHeight: 43, -// BlockHash: hashes[0], -// TransactionHash: sigs[0], -// TransactionIndex: 0, -// TransactionLogIndex: 0, -// }, -// Prefix: ">", -// Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", -// }, -// }) -// }) -// -// client.AssertExpectations(t) -//} -// -//type slotUnsync struct { -// ch chan struct{} -// waiting atomic.Bool -//} -// -//func (u *slotUnsync) next() { -// if u.waiting.Load() { -// u.waiting.Store(false) -// -// <-u.ch -// -// return -// } -// -// u.waiting.Store(true) -// -// u.ch <- struct{}{} -//} -// -////func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { -//// t.Parallel() -//// -//// client := new(mocks.RPCClient) -//// parser := new(testParser) -//// ctx := tests.Context(t) -//// -//// collector := logpoller.NewEncodedLogCollector(client, parser, logger.Nop()) -//// -//// require.NoError(t, collector.Start(ctx)) -//// t.Cleanup(func() { -//// require.NoError(t, collector.Close()) -//// }) -//// -//// pubKey := solana.PublicKey{2, 1, 4, 2} -//// slots := []uint64{44, 43, 42} -//// sigs := make([]solana.Signature, len(slots)*2) -//// -//// for idx := range len(sigs) { -//// _, _ = rand.Read(sigs[idx][:]) -//// } -//// -//// var latest atomic.Uint64 -//// -//// latest.Store(uint64(40)) -//// -//// // GetLatestBlockhash might be called at start-up; make it take some time because the result isn't needed for this test -//// client.EXPECT(). -//// GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). -//// RunAndReturn(latestBlockhashReturnFunc(&latest)). -//// After(2 * time.Second). -//// Maybe() -//// -//// client.EXPECT(). -//// GetBlocks( -//// mock.Anything, -//// mock.MatchedBy(getBlocksStartValMatcher), -//// mock.MatchedBy(getBlocksEndValMatcher(&latest)), -//// rpc.CommitmentFinalized, -//// ). -//// RunAndReturn(getBlocksReturnFunc(true)) -//// -//// client.EXPECT(). -//// GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.Anything). -//// RunAndReturn(func(_ context.Context, pk solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { -//// ret := []*rpc.TransactionSignature{} -//// -//// if opts != nil && opts.Before.String() == (solana.Signature{}).String() { -//// for idx := range slots { -//// ret = append(ret, &rpc.TransactionSignature{Slot: slots[idx], Signature: sigs[idx*2]}) -//// ret = append(ret, &rpc.TransactionSignature{Slot: slots[idx], Signature: sigs[(idx*2)+1]}) -//// } -//// } -//// -//// return ret, nil -//// }) -//// -//// client.EXPECT(). -//// GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). -//// RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { -//// idx := -1 -//// for sIdx, slt := range slots { -//// if slt == slot { -//// idx = sIdx -//// -//// break -//// } -//// } -//// -//// height := slot - 1 -//// -//// if idx == -1 { -//// return &rpc.GetBlockResult{ -//// Transactions: []rpc.TransactionWithMeta{}, -//// Signatures: []solana.Signature{}, -//// BlockHeight: &height, -//// }, nil -//// } -//// -//// return &rpc.GetBlockResult{ -//// Transactions: []rpc.TransactionWithMeta{ -//// { -//// Meta: &rpc.TransactionMeta{ -//// LogMessages: messages, -//// }, -//// }, -//// { -//// Meta: &rpc.TransactionMeta{ -//// LogMessages: messages, -//// }, -//// }, -//// }, -//// Signatures: []solana.Signature{sigs[idx*2], sigs[(idx*2)+1]}, -//// BlockHeight: &height, -//// }, nil -//// }) -//// -//// assert.NoError(t, collector.BackfillForAddress(ctx, pubKey.String(), 42)) -//// -//// tests.AssertEventually(t, func() bool { -//// return parser.Count() == 6 -//// }) -////} -// -//func BenchmarkEncodedLogCollector(b *testing.B) { -// ctx := tests.Context(b) -// -// ticker := time.NewTimer(500 * time.Millisecond) -// defer ticker.Stop() -// -// parser := new(testParser) -// blockProducer := &testBlockProducer{ -// b: b, -// nextSlot: 10, -// blockSigs: make(map[uint64][]solana.Signature), -// sigs: make(map[string]bool), -// } -// -// collector := logpoller.NewEncodedLogCollector(blockProducer, parser, logger.Nop()) -// -// require.NoError(b, collector.Start(ctx)) -// b.Cleanup(func() { -// require.NoError(b, collector.Close()) -// }) -// -// b.ReportAllocs() -// b.ResetTimer() -// -//BenchLoop: -// for i := 0; i < b.N; i++ { -// select { -// case <-ticker.C: -// blockProducer.incrementSlot() -// case <-ctx.Done(): -// break BenchLoop -// default: -// blockProducer.makeEvent() -// } -// } -// -// b.ReportMetric(float64(parser.Count())/b.Elapsed().Seconds(), "events/sec") -// b.ReportMetric(float64(blockProducer.Count())/b.Elapsed().Seconds(), "rcp_calls/sec") -//} -// -//type testBlockProducer struct { -// b *testing.B -// -// mu sync.RWMutex -// nextSlot uint64 -// blockSigs map[uint64][]solana.Signature -// sigs map[string]bool -// count uint64 -//} -// -//func (p *testBlockProducer) incrementSlot() { -// p.b.Helper() -// -// p.mu.Lock() -// defer p.mu.Unlock() -// -// p.nextSlot++ -// p.blockSigs[p.nextSlot] = make([]solana.Signature, 0, 100) -//} -// -//func (p *testBlockProducer) makeEvent() { -// p.b.Helper() -// -// p.mu.Lock() -// defer p.mu.Unlock() -// -// var sig solana.Signature -// -// _, _ = rand.Read(sig[:]) -// -// p.blockSigs[p.nextSlot] = append(p.blockSigs[p.nextSlot], sig) -// p.sigs[sig.String()] = true -//} -// -//func (p *testBlockProducer) Count() uint64 { -// p.mu.RLock() -// defer p.mu.RUnlock() -// -// return p.count -//} -// -//func (p *testBlockProducer) GetLatestBlockhash(_ context.Context, _ rpc.CommitmentType) (out *rpc.GetLatestBlockhashResult, err error) { -// p.b.Helper() -// -// p.mu.Lock() -// p.count++ -// p.mu.Unlock() -// -// p.mu.RLock() -// defer p.mu.RUnlock() -// -// return &rpc.GetLatestBlockhashResult{ -// RPCContext: rpc.RPCContext{ -// Context: rpc.Context{ -// Slot: p.nextSlot, -// }, -// }, -// }, nil -//} -// -//func (p *testBlockProducer) GetBlocks(_ context.Context, startSlot uint64, endSlot *uint64, _ rpc.CommitmentType) (out rpc.BlocksResult, err error) { -// p.b.Helper() -// -// p.mu.Lock() -// p.count++ -// p.mu.Unlock() -// -// blocks := make([]uint64, *endSlot-startSlot) -// for idx := range blocks { -// blocks[idx] = startSlot + uint64(idx) -// } -// -// return rpc.BlocksResult(blocks), nil -//} -// -//func (p *testBlockProducer) GetBlockWithOpts(_ context.Context, block uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { -// p.b.Helper() -// -// p.mu.Lock() -// defer p.mu.Unlock() -// -// var result rpc.GetBlockResult -// -// sigs := p.blockSigs[block] -// -// switch opts.TransactionDetails { -// case rpc.TransactionDetailsFull: -// result.Transactions = make([]rpc.TransactionWithMeta, len(sigs)) -// for idx, sig := range sigs { -// delete(p.sigs, sig.String()) -// -// result.Transactions[idx] = rpc.TransactionWithMeta{ -// Slot: block, -// Meta: &rpc.TransactionMeta{ -// LogMessages: messages, -// }, -// } -// } -// case rpc.TransactionDetailsSignatures: -// result.Signatures = sigs -// delete(p.blockSigs, block) -// case rpc.TransactionDetailsNone: -// fallthrough -// default: -// } -// -// p.count++ -// result.BlockHeight = &block -// -// return &result, nil -//} -// -//func (p *testBlockProducer) GetSignaturesForAddressWithOpts(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { -// p.b.Helper() -// -// return nil, nil -//} -// -//func (p *testBlockProducer) GetTransaction(_ context.Context, sig solana.Signature, _ *rpc.GetTransactionOpts) (*rpc.GetTransactionResult, error) { -// p.b.Helper() -// -// p.mu.Lock() -// defer p.mu.Unlock() -// -// var msgs []string -// -// p.count++ -// _, ok := p.sigs[sig.String()] -// if ok { -// msgs = messages -// } -// -// delete(p.sigs, sig.String()) -// -// return &rpc.GetTransactionResult{ -// Meta: &rpc.TransactionMeta{ -// LogMessages: msgs, -// }, -// }, nil -//} -// -//type testParser struct { -// called atomic.Bool -// mu sync.Mutex -// events []logpoller.ProgramEvent -//} -// -//func (p *testParser) Process(block logpoller.Block) error { -// p.called.Store(true) -// -// p.mu.Lock() -// p.events = append(p.events, block.Events...) -// p.mu.Unlock() -// -// return nil -//} -// -//func (p *testParser) Called() bool { -// return p.called.Load() -//} -// -//func (p *testParser) Count() uint64 { -// p.mu.Lock() -// defer p.mu.Unlock() -// -// return uint64(len(p.events)) -//} -// -//func (p *testParser) Events() []logpoller.ProgramEvent { -// p.mu.Lock() -// defer p.mu.Unlock() -// -// return p.events -//} -// -//func latestBlockhashReturnFunc(latest *atomic.Uint64) func(context.Context, rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { -// return func(ctx context.Context, ct rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { -// defer func() { -// latest.Store(latest.Load() + 2) -// }() -// -// return &rpc.GetLatestBlockhashResult{ -// RPCContext: rpc.RPCContext{ -// Context: rpc.Context{ -// Slot: latest.Load(), -// }, -// }, -// Value: &rpc.LatestBlockhashResult{ -// LastValidBlockHeight: latest.Load() - 1, -// }, -// }, nil -// } -//} -// -//func getBlocksReturnFunc(empty bool) func(context.Context, uint64, *uint64, rpc.CommitmentType) (rpc.BlocksResult, error) { -// return func(_ context.Context, u1 uint64, u2 *uint64, _ rpc.CommitmentType) (rpc.BlocksResult, error) { -// blocks := []uint64{} -// -// if !empty { -// blocks = make([]uint64, *u2-u1+1) -// for idx := range blocks { -// blocks[idx] = u1 + uint64(idx) -// } -// } -// -// return rpc.BlocksResult(blocks), nil -// } -//} -// -//func getBlocksStartValMatcher(val uint64) bool { -// return val > uint64(0) -//} -// -//func getBlocksEndValMatcher(latest *atomic.Uint64) func(*uint64) bool { -// return func(val *uint64) bool { -// return val != nil && *val <= latest.Load() -// } -//} +import ( + "context" + "crypto/rand" + "slices" + "sync/atomic" + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/rpc" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks" +) + +var ( + messages = []string{ + "Program J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4 invoke [1]", + "Program log: Instruction: CreateLog", + "Program data: HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + "Program J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4 consumed 1477 of 200000 compute units", + "Program J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4 success", + } +) + +func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { + t.Parallel() + + client := new(mocks.RPCClient) + ctx := tests.Context(t) + + collector := logpoller.NewEncodedLogCollector(client, logger.TestSugared(t)) + + require.NoError(t, collector.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, collector.Close()) + }) + + var latest atomic.Uint64 + + latest.Store(uint64(40)) + + address, err := solana.PublicKeyFromBase58("J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4") + require.NoError(t, err) + slots := []uint64{44, 43, 42, 41} + var txSigsResponse []*rpc.TransactionSignature + for _, slot := range slots { + txSigsResponse = append(txSigsResponse, &rpc.TransactionSignature{Slot: slot}) + } + client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, key solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { + switch *opts.MinContextSlot { + case 44: + return txSigsResponse, nil + case 41: + return nil, nil + default: + panic("unexpected call") + } + }).Twice() + + sigs := make([]solana.Signature, len(slots)) + hashes := make([]solana.Hash, len(slots)) + scrambler := &slotUnsync{ch: make(chan struct{})} + + for idx := range len(sigs) { + _, _ = rand.Read(sigs[idx][:]) + _, _ = rand.Read(hashes[idx][:]) + } + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + slotIdx := slices.Index(slots, slot) + if slotIdx == -1 { + require.Fail(t, "trying to get block for unexpected slot", slot) + } + + // imitate loading block data out of order + // every other block must wait for the block previous + scrambler.next() + + height := slot - 1 + + tx := solana.Transaction{Signatures: []solana.Signature{sigs[slotIdx]}} + binaryTx, err := tx.MarshalBinary() + require.NoError(t, err) + return &rpc.GetBlockResult{ + Blockhash: hashes[slotIdx], + Transactions: []rpc.TransactionWithMeta{ + { + Transaction: rpc.DataBytesOrJSONFromBytes(binaryTx), + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, + }, + }, + BlockHeight: &height, + }, nil + }) + + results, cleanUp, err := collector.BackfillForAddresses(tests.Context(t), []logpoller.PublicKey{logpoller.PublicKey(address)}, 41, 44) + require.NoError(t, err) + defer cleanUp() + var events []logpoller.ProgramEvent + for event := range results { + events = append(events, event.Events...) + } + + require.Equal(t, []logpoller.ProgramEvent{ + { + BlockData: logpoller.BlockData{ + SlotNumber: 41, + BlockHeight: 40, + BlockHash: hashes[3], + TransactionHash: sigs[3], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 42, + BlockHeight: 41, + BlockHash: hashes[2], + TransactionHash: sigs[2], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 43, + BlockHeight: 42, + BlockHash: hashes[1], + TransactionHash: sigs[1], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 44, + BlockHeight: 43, + BlockHash: hashes[0], + TransactionHash: sigs[0], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + }, events) +} + +type slotUnsync struct { + ch chan struct{} + waiting atomic.Bool +} + +func (u *slotUnsync) next() { + if u.waiting.Load() { + u.waiting.Store(false) + + <-u.ch + + return + } + + u.waiting.Store(true) + + u.ch <- struct{}{} +} diff --git a/pkg/solana/logpoller/mocks/rpc_client.go b/pkg/solana/logpoller/mocks/rpc_client.go index a811d148e..db60d29ac 100644 --- a/pkg/solana/logpoller/mocks/rpc_client.go +++ b/pkg/solana/logpoller/mocks/rpc_client.go @@ -85,126 +85,6 @@ func (_c *RPCClient_GetBlockWithOpts_Call) RunAndReturn(run func(context.Context return _c } -// GetBlocks provides a mock function with given fields: ctx, startSlot, endSlot, commitment -func (_m *RPCClient) GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64, commitment rpc.CommitmentType) (rpc.BlocksResult, error) { - ret := _m.Called(ctx, startSlot, endSlot, commitment) - - if len(ret) == 0 { - panic("no return value specified for GetBlocks") - } - - var r0 rpc.BlocksResult - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, *uint64, rpc.CommitmentType) (rpc.BlocksResult, error)); ok { - return rf(ctx, startSlot, endSlot, commitment) - } - if rf, ok := ret.Get(0).(func(context.Context, uint64, *uint64, rpc.CommitmentType) rpc.BlocksResult); ok { - r0 = rf(ctx, startSlot, endSlot, commitment) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(rpc.BlocksResult) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, uint64, *uint64, rpc.CommitmentType) error); ok { - r1 = rf(ctx, startSlot, endSlot, commitment) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// RPCClient_GetBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBlocks' -type RPCClient_GetBlocks_Call struct { - *mock.Call -} - -// GetBlocks is a helper method to define mock.On call -// - ctx context.Context -// - startSlot uint64 -// - endSlot *uint64 -// - commitment rpc.CommitmentType -func (_e *RPCClient_Expecter) GetBlocks(ctx interface{}, startSlot interface{}, endSlot interface{}, commitment interface{}) *RPCClient_GetBlocks_Call { - return &RPCClient_GetBlocks_Call{Call: _e.mock.On("GetBlocks", ctx, startSlot, endSlot, commitment)} -} - -func (_c *RPCClient_GetBlocks_Call) Run(run func(ctx context.Context, startSlot uint64, endSlot *uint64, commitment rpc.CommitmentType)) *RPCClient_GetBlocks_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(*uint64), args[3].(rpc.CommitmentType)) - }) - return _c -} - -func (_c *RPCClient_GetBlocks_Call) Return(out rpc.BlocksResult, err error) *RPCClient_GetBlocks_Call { - _c.Call.Return(out, err) - return _c -} - -func (_c *RPCClient_GetBlocks_Call) RunAndReturn(run func(context.Context, uint64, *uint64, rpc.CommitmentType) (rpc.BlocksResult, error)) *RPCClient_GetBlocks_Call { - _c.Call.Return(run) - return _c -} - -// GetLatestBlockhash provides a mock function with given fields: ctx, commitment -func (_m *RPCClient) GetLatestBlockhash(ctx context.Context, commitment rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { - ret := _m.Called(ctx, commitment) - - if len(ret) == 0 { - panic("no return value specified for GetLatestBlockhash") - } - - var r0 *rpc.GetLatestBlockhashResult - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error)); ok { - return rf(ctx, commitment) - } - if rf, ok := ret.Get(0).(func(context.Context, rpc.CommitmentType) *rpc.GetLatestBlockhashResult); ok { - r0 = rf(ctx, commitment) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*rpc.GetLatestBlockhashResult) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, rpc.CommitmentType) error); ok { - r1 = rf(ctx, commitment) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// RPCClient_GetLatestBlockhash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestBlockhash' -type RPCClient_GetLatestBlockhash_Call struct { - *mock.Call -} - -// GetLatestBlockhash is a helper method to define mock.On call -// - ctx context.Context -// - commitment rpc.CommitmentType -func (_e *RPCClient_Expecter) GetLatestBlockhash(ctx interface{}, commitment interface{}) *RPCClient_GetLatestBlockhash_Call { - return &RPCClient_GetLatestBlockhash_Call{Call: _e.mock.On("GetLatestBlockhash", ctx, commitment)} -} - -func (_c *RPCClient_GetLatestBlockhash_Call) Run(run func(ctx context.Context, commitment rpc.CommitmentType)) *RPCClient_GetLatestBlockhash_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(rpc.CommitmentType)) - }) - return _c -} - -func (_c *RPCClient_GetLatestBlockhash_Call) Return(out *rpc.GetLatestBlockhashResult, err error) *RPCClient_GetLatestBlockhash_Call { - _c.Call.Return(out, err) - return _c -} - -func (_c *RPCClient_GetLatestBlockhash_Call) RunAndReturn(run func(context.Context, rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error)) *RPCClient_GetLatestBlockhash_Call { - _c.Call.Return(run) - return _c -} - // GetSignaturesForAddressWithOpts provides a mock function with given fields: _a0, _a1, _a2 func (_m *RPCClient) GetSignaturesForAddressWithOpts(_a0 context.Context, _a1 solana.PublicKey, _a2 *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { ret := _m.Called(_a0, _a1, _a2)