Skip to content

Commit

Permalink
Tests for get block job
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 14, 2025
1 parent 0dd1745 commit 5a75dae
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 20 deletions.
8 changes: 6 additions & 2 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

// MaxSupportTransactionVersion defines max transaction version to return in responses.
// If the requested block contains a transaction with a higher version, an error will be returned.
const MaxSupportTransactionVersion = uint64(0) // (legacy + v0)

const (
DevnetGenesisHash = "EtWTRABZaYq6iMfeYKouRu166VU2xqa1wcaWoxPkrZBG"
TestnetGenesisHash = "4uhcVJyU9pJkvQyS88uRDiswHXSCkY3zQawwpjk2NsNY"
Expand Down Expand Up @@ -324,7 +328,7 @@ func (c *Client) GetLatestBlock(ctx context.Context) (*rpc.GetBlockResult, error
ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
defer cancel()
v, err, _ := c.requestGroup.Do("GetBlockWithOpts", func() (interface{}, error) {
version := uint64(0) // pull all tx types (legacy + v0)
version := MaxSupportTransactionVersion // pull all tx types (legacy + v0)
return c.rpc.GetBlockWithOpts(ctx, slot, &rpc.GetBlockOpts{
Commitment: c.commitment,
MaxSupportedTransactionVersion: &version,
Expand Down Expand Up @@ -353,7 +357,7 @@ func (c *Client) GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult
ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
defer cancel()
v, err, _ := c.requestGroup.Do("GetBlockWithOpts", func() (interface{}, error) {
version := uint64(0) // pull all tx types (legacy + v0)
version := MaxSupportTransactionVersion
return c.rpc.GetBlockWithOpts(ctx, slot, &rpc.GetBlockOpts{
Commitment: c.commitment,
MaxSupportedTransactionVersion: &version,
Expand Down
35 changes: 17 additions & 18 deletions pkg/solana/logpoller/job_get_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,27 @@ import (

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
)

// getBlockJob is a job that fetches transaction signatures from a block and loads
// the job queue with getTransactionLogsJobs for each transaction found in the block.
type getBlockJob struct {
slotNumber uint64
client RPCClient
blocks chan Block
done chan struct{}
slotNumber uint64
client RPCClient
blocks chan Block
done chan struct{}
parseProgramLogs func(logs []string) []ProgramOutput
}

func newGetBlockJob(client RPCClient, blocks chan Block, slotNumber uint64) *getBlockJob {
return &getBlockJob{
client: client,
blocks: blocks,
slotNumber: slotNumber,
done: make(chan struct{}),
client: client,
blocks: blocks,
slotNumber: slotNumber,
done: make(chan struct{}),
parseProgramLogs: parseProgramLogs,
}
}

Expand All @@ -36,14 +40,11 @@ func (j *getBlockJob) Done() <-chan struct{} {

func (j *getBlockJob) Run(ctx context.Context) error {
var excludeRewards bool
// TODO: move min version definition to an RPC Client
// NOTE: if max supported transaction version is changed after creation of a block that contains transactions of a new version
// we at risk of producing duplicate events! To avoid this we'll need to do block based migration.
version := uint64(0) // pull all tx types (legacy + v0)
version := client.MaxSupportTransactionVersion
block, err := j.client.GetBlockWithOpts(
ctx,
j.slotNumber,
// NOTE: any change to the filtering argmuments may affect calculation of logIndex, which to lead to events duplication.
// NOTE: any change to the filtering arguments may affect calculation of logIndex, which to lead to events duplication.
&rpc.GetBlockOpts{
Encoding: solana.EncodingBase64,
Commitment: rpc.CommitmentFinalized,
Expand Down Expand Up @@ -72,10 +73,10 @@ func (j *getBlockJob) Run(ctx context.Context) error {
detail.trxIdx = idx
tx, err := txWithMeta.GetTransaction()
if err != nil {
return fmt.Errorf("failed to parse transaction %d in slot %d: %w", idx, txWithMeta.Slot, err)
return fmt.Errorf("failed to parse transaction %d in slot %d: %w", idx, j.slotNumber, err)
}
if len(tx.Signatures) == 0 {
return fmt.Errorf("expected all transactions to have at least one signature %d in slot %d", idx, txWithMeta.Slot)
return fmt.Errorf("expected all transactions to have at least one signature %d in slot %d", idx, j.slotNumber)
}
detail.trxSig = tx.Signatures[0] // according to Solana docs fist signature is used as ID

Expand All @@ -99,11 +100,9 @@ func (j *getBlockJob) Run(ctx context.Context) error {
}

func (j *getBlockJob) messagesToEvents(messages []string, detail eventDetail) []ProgramEvent {
// TODO: changes to parsing might cause changes in logIdx generate, we might want to find a more stable way of doing it.
var logIdx uint
// TODO: only parse logs produced by CL contracts, otherwise if we enable custom user calls, it will be possible to forge a msg.
events := make([]ProgramEvent, 0, len(messages))
for _, outputs := range parseProgramLogs(messages) {
for _, outputs := range j.parseProgramLogs(messages) {
for i, event := range outputs.Events {
event.SlotNumber = detail.slotNumber
event.BlockHeight = detail.blockHeight
Expand Down
143 changes: 143 additions & 0 deletions pkg/solana/logpoller/job_get_block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package logpoller

import (
"context"
"errors"
"testing"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks"
)

func TestGetBlockJob(t *testing.T) {
const slotNumber = uint64(42)
t.Run("String contains slot number", func(t *testing.T) {
job := newGetBlockJob(nil, nil, slotNumber)
require.Equal(t, "getBlock for slotNumber: 42", job.String())
})
t.Run("Error if fails to get block", func(t *testing.T) {
client := mocks.NewRPCClient(t)
expectedError := errors.New("rpc failed")
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(nil, expectedError).Once()
job := newGetBlockJob(client, make(chan Block), slotNumber)
err := job.Run(tests.Context(t))
require.ErrorIs(t, err, expectedError)
})
t.Run("Error if fails to get transaction", func(t *testing.T) {
client := mocks.NewRPCClient(t)
block := rpc.GetBlockResult{Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes([]byte("{"))}}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), slotNumber)
err := job.Run(tests.Context(t))
require.ErrorContains(t, err, "failed to parse transaction 0 in slot 42")
})
t.Run("Error if Tx has no signatures", func(t *testing.T) {
client := mocks.NewRPCClient(t)
tx := solana.Transaction{}
txB, err := tx.MarshalBinary()
require.NoError(t, err)
block := rpc.GetBlockResult{Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB)}}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), slotNumber)
err = job.Run(tests.Context(t))
require.ErrorContains(t, err, "expected all transactions to have at least one signature 0 in slot 42")
})
t.Run("Can abort even if no one waits for result", func(t *testing.T) {
client := mocks.NewRPCClient(t)
tx := solana.Transaction{Signatures: make([]solana.Signature, 1)}
txB, err := tx.MarshalBinary()
require.NoError(t, err)
block := rpc.GetBlockResult{Transactions: []rpc.TransactionWithMeta{{Transaction: rpc.DataBytesOrJSONFromBytes(txB), Meta: &rpc.TransactionMeta{}}}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block), slotNumber)
ctx, cancel := context.WithCancel(tests.Context(t))
cancel()
err = job.Run(ctx)
require.ErrorIs(t, err, context.Canceled)
select {
case <-job.Done():
require.Fail(t, "expected done channel to be open as job was aborted")
default:
}
})
t.Run("Happy path", func(t *testing.T) {
client := mocks.NewRPCClient(t)
tx1Signature := solana.Signature{4, 5, 6}
tx2Signature := solana.Signature{7, 8, 9}
txSigToDataBytes := func(sig solana.Signature) *rpc.DataBytesOrJSON {
tx := solana.Transaction{Signatures: []solana.Signature{sig}}
binary, err := tx.MarshalBinary()
require.NoError(t, err)
return rpc.DataBytesOrJSONFromBytes(binary)
}
txWithMeta1 := rpc.TransactionWithMeta{Transaction: txSigToDataBytes(tx1Signature), Meta: &rpc.TransactionMeta{LogMessages: []string{"log1", "log2"}}}
txWithMeta2 := rpc.TransactionWithMeta{Transaction: txSigToDataBytes(tx2Signature), Meta: &rpc.TransactionMeta{LogMessages: []string{"log3"}}}
height := uint64(41)
block := rpc.GetBlockResult{BlockHeight: &height, Blockhash: solana.Hash{1, 2, 3}, Transactions: []rpc.TransactionWithMeta{txWithMeta1, txWithMeta2}}
client.EXPECT().GetBlockWithOpts(mock.Anything, slotNumber, mock.Anything).Return(&block, nil).Once()
job := newGetBlockJob(client, make(chan Block, 1), slotNumber)
job.parseProgramLogs = func(logs []string) []ProgramOutput {
result := ProgramOutput{}
for _, l := range logs {
result.Events = append(result.Events, ProgramEvent{Data: l})
}
return []ProgramOutput{result}
}
err := job.Run(tests.Context(t))
require.NoError(t, err)
result := <-job.blocks
require.Equal(t, Block{
SlotNumber: slotNumber,
BlockHash: block.Blockhash,
Events: []ProgramEvent{
{
BlockData: BlockData{
SlotNumber: slotNumber,
BlockHeight: height,
BlockHash: block.Blockhash,
TransactionHash: tx1Signature,
TransactionLogIndex: 0,
TransactionIndex: 0,
},
Prefix: "",
Data: "log1",
},
{
BlockData: BlockData{
SlotNumber: slotNumber,
BlockHeight: height,
BlockHash: block.Blockhash,
TransactionHash: tx1Signature,
TransactionLogIndex: 1,
TransactionIndex: 0,
},
Prefix: "",
Data: "log2",
},
{
BlockData: BlockData{
SlotNumber: slotNumber,
BlockHeight: height,
BlockHash: block.Blockhash,
TransactionHash: tx2Signature,
TransactionLogIndex: 0,
TransactionIndex: 1,
},
Prefix: "",
Data: "log3",
},
},
}, result)
select {
case <-job.Done():
default:
t.Fatal("expected job to be done")
}
})
}

0 comments on commit 5a75dae

Please sign in to comment.