Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: client merging concurrent calls that shouldn't be merged #1013

Merged
merged 7 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 26 additions & 19 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Reader interface {
GetLatestBlock(ctx context.Context) (*rpc.GetBlockResult, error)
// GetLatestBlockHeight returns the latest block height of the node based on the configured commitment type
GetLatestBlockHeight(ctx context.Context) (uint64, error)
GetTransaction(ctx context.Context, txHash solana.Signature, opts *rpc.GetTransactionOpts) (*rpc.GetTransactionResult, error)
GetTransaction(ctx context.Context, txHash solana.Signature) (*rpc.GetTransactionResult, error)
GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64) (rpc.BlocksResult, error)
GetBlocksWithLimit(ctx context.Context, startSlot uint64, limit uint64) (*rpc.BlocksResult, error)
GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error)
Expand Down Expand Up @@ -69,6 +69,7 @@ type Client struct {
log logger.Logger

// provides a duplicate function call suppression mechanism
// As a rule of thumb: if two calls passing different arguments must not be merged/deduplicated, then you must incorporate those arguments into the key.
requestGroup *singleflight.Group
}

Expand Down Expand Up @@ -120,7 +121,10 @@ func (c *Client) SlotHeightWithCommitment(ctx context.Context, commitment rpc.Co

ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()
v, err, _ := c.requestGroup.Do("GetSlotHeight", func() (interface{}, error) {

// Include the commitment in the requestGroup key so calls with different commitments won't be merged
key := fmt.Sprintf("GetSlotHeight(%s)", commitment)
v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
return c.rpc.GetSlot(ctx, commitment)
})
return v.(uint64), err
Expand All @@ -141,24 +145,16 @@ func (c *Client) GetSignaturesForAddressWithOpts(ctx context.Context, addr solan
return c.rpc.GetSignaturesForAddressWithOpts(ctx, addr, opts)
}

func (c *Client) GetTransaction(ctx context.Context, txHash solana.Signature, opts *rpc.GetTransactionOpts) (*rpc.GetTransactionResult, error) {
func (c *Client) GetTransaction(ctx context.Context, txHash solana.Signature) (*rpc.GetTransactionResult, error) {
done := c.latency("transaction")
defer done()

ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()

if opts == nil {
opts = &rpc.GetTransactionOpts{
Encoding: solana.EncodingBase64,
}
}
if opts.Commitment == "" {
opts.Commitment = c.commitment
}

v, err, _ := c.requestGroup.Do("GetTransaction", func() (interface{}, error) {
return c.rpc.GetTransaction(ctx, txHash, opts)
// Use txHash in the key so different signatures won't be merged on concurrent calls.
key := fmt.Sprintf("GetTransaction(%s)", txHash.String())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to pass custom opts. We should include them into the key or remove it from the function arguments.
I'm leaning towards second option as the method is only used in one test

Copy link
Contributor Author

@Farber98 Farber98 Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Removing this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to make some changes in the new multi_client to remove the extra param

v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
return c.rpc.GetTransaction(ctx, txHash, &rpc.GetTransactionOpts{Encoding: solana.EncodingBase64, Commitment: c.commitment})
})
return v.(*rpc.GetTransactionResult), err
}
Expand All @@ -180,7 +176,13 @@ func (c *Client) GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint6
ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()

v, err, _ := c.requestGroup.Do("GetBlocks", func() (interface{}, error) {
// Incorporate startSlot/endSlot into the key to differentiate concurrent calls with different ranges
endSlotStr := "nil"
if endSlot != nil {
endSlotStr = fmt.Sprint(*endSlot)
}
key := fmt.Sprintf("GetBlocks(%d,%s)", startSlot, endSlotStr)
v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
return c.rpc.GetBlocks(ctx, startSlot, endSlot, c.commitment)
})
return v.(rpc.BlocksResult), err
Expand Down Expand Up @@ -347,12 +349,15 @@ func (c *Client) GetLatestBlockHeight(ctx context.Context) (uint64, error) {
}

func (c *Client) GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error) {
// get block based on slot
done := c.latency("get_block")
defer done()
ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rest of the call can be replaced with GetBlock(ctx context.Context, slot uint64)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean replacing:

// Adding slot to the key so concurrent calls to GetBlock for different slots are not merged. Without including the slot,
	// it would treat all GetBlock calls as identical and merge them, returning whichever block it fetched first to all callers.
	key := fmt.Sprintf("GetBlockWithOpts(%d)", slot)
	v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
		version := uint64(0) // pull all tx types (legacy + v0)
		return c.rpc.GetBlockWithOpts(ctx, slot, &rpc.GetBlockOpts{
			Commitment:                     c.commitment,
			MaxSupportedTransactionVersion: &version,
		})
	})
	return v.(*rpc.GetBlockResult), err

with

return c.rpc.GetBlock(ctx, slot)

If that's the case I think this is not ideal as we are not providing the optional parameters. Under the hood we would be calling GetBlockWithOpts but not providing the following opts:

&rpc.GetBlockOpts{
	Commitment: c.commitment,
	MaxSupportedTransactionVersion: &version,
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. I meant to leave this comment for the GetLatestBlock. It can look as follows:

func (c *Client) GetLatestBlock(ctx context.Context) (*rpc.GetBlockResult, error) {
	// get latest confirmed slot
	slot, err := c.SlotHeightWithCommitment(ctx, c.commitment)
	if err != nil {
		return nil, fmt.Errorf("GetLatestBlock.SlotHeight: %w", err)
	}

	// get block based on slot
	done := c.latency("latest_block")
	defer done()
	return c.GetBlock(ctx, slot)
}

Instead of

func (c *Client) GetLatestBlock(ctx context.Context) (*rpc.GetBlockResult, error) {
	// get latest confirmed slot
	slot, err := c.SlotHeightWithCommitment(ctx, c.commitment)
	if err != nil {
		return nil, fmt.Errorf("GetLatestBlock.SlotHeight: %w", err)
	}

	// get block based on slot
	done := c.latency("latest_block")
	defer done()
	ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
	defer cancel()
	v, err, _ := c.requestGroup.Do("GetBlockWithOpts", func() (interface{}, error) {
		version := uint64(0) // pull all tx types (legacy + v0)
		return c.rpc.GetBlockWithOpts(ctx, slot, &rpc.GetBlockOpts{
			Commitment:                     c.commitment,
			MaxSupportedTransactionVersion: &version,
		})
	})
	return v.(*rpc.GetBlockResult), err
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

defer cancel()
v, err, _ := c.requestGroup.Do("GetBlockWithOpts", func() (interface{}, error) {

// Adding slot to the key so concurrent calls to GetBlock for different slots are not merged. Without including the slot,
// it would treat all GetBlock calls as identical and merge them, returning whichever block it fetched first to all callers.
key := fmt.Sprintf("GetBlockWithOpts(%d)", slot)
v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
version := uint64(0) // pull all tx types (legacy + v0)
return c.rpc.GetBlockWithOpts(ctx, slot, &rpc.GetBlockOpts{
Commitment: c.commitment,
Expand All @@ -369,7 +374,9 @@ func (c *Client) GetBlocksWithLimit(ctx context.Context, startSlot uint64, limit
ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
defer cancel()

v, err, _ := c.requestGroup.Do("GetBlocksWithLimit", func() (interface{}, error) {
// Incorporate startSlot and limit into the key to differentiate on concurrent calls.
key := fmt.Sprintf("GetBlocksWithLimit(%d,%d)", startSlot, limit)
v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
return c.rpc.GetBlocksWithLimit(ctx, startSlot, limit, c.commitment)
})
return v.(*rpc.BlocksResult), err
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestClient_Writer_Integration(t *testing.T) {
assert.Nil(t, statuses[0].Err)
assert.NotNil(t, statuses[1].Err)

getTxResult, err := c.GetTransaction(ctx, sigSuccess, nil)
getTxResult, err := c.GetTransaction(ctx, sigSuccess)
assert.NoError(t, err)
assert.NotNil(t, getTxResult)

Expand Down
29 changes: 14 additions & 15 deletions pkg/solana/client/mocks/reader_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/solana/client/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ func (m *MultiClient) GetLatestBlock(ctx context.Context) (*rpc.GetBlockResult,
return r.GetLatestBlock(ctx)
}

func (m *MultiClient) GetTransaction(ctx context.Context, txHash solana.Signature, opts *rpc.GetTransactionOpts) (*rpc.GetTransactionResult, error) {
func (m *MultiClient) GetTransaction(ctx context.Context, txHash solana.Signature) (*rpc.GetTransactionResult, error) {
r, err := m.getClient()
if err != nil {
return nil, err
}

return r.GetTransaction(ctx, txHash, opts)
return r.GetTransaction(ctx, txHash)
}

func (m *MultiClient) GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64) (rpc.BlocksResult, error) {
Expand Down
Loading