Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: detect chain reorgs #411

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ packages:
github.com/xmtp/xmtpd/pkg/indexer:
interfaces:
IBlockTracker:
ChainReorgHandler:
github.com/xmtp/xmtpd/pkg/blockchain:
interfaces:
ChainClient:
Expand Down
1 change: 1 addition & 0 deletions pkg/blockchain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ChainClient interface {
ethereum.BlockNumberReader
ethereum.LogFilterer
ethereum.ChainIDReader
ethereum.ChainReader
}

type TransactionSigner interface {
Expand Down
34 changes: 27 additions & 7 deletions pkg/blockchain/rpcLogStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,21 @@ func (c *RpcLogStreamBuilder) ListenForContractEvent(
contractAddress common.Address,
topics []common.Hash,
maxDisconnectTime time.Duration,
) <-chan types.Log {
) (<-chan types.Log, chan<- uint64) {
eventChannel := make(chan types.Log, 100)
reorgChannel := make(chan uint64, 1)
c.contractConfigs = append(
c.contractConfigs,
contractConfig{fromBlock, contractAddress, topics, eventChannel, maxDisconnectTime},
contractConfig{
fromBlock,
contractAddress,
topics,
eventChannel,
reorgChannel,
maxDisconnectTime,
},
)
return eventChannel
return eventChannel, reorgChannel
fbac marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *RpcLogStreamBuilder) Build() (*RpcLogStreamer, error) {
Expand All @@ -66,7 +74,8 @@ type contractConfig struct {
fromBlock uint64
contractAddress common.Address
topics []common.Hash
channel chan<- types.Log
eventChannel chan<- types.Log
reorgChannel chan uint64
maxDisconnectTime time.Duration
}

Expand Down Expand Up @@ -119,12 +128,19 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) {
fromBlock := watcher.fromBlock
logger := r.logger.With(zap.String("contractAddress", watcher.contractAddress.Hex()))
startTime := time.Now()
defer close(watcher.channel)
defer close(watcher.eventChannel)

for {
select {
case <-r.ctx.Done():
logger.Debug("Stopping watcher")
return
case reorgBlock := <-watcher.reorgChannel:
fromBlock = reorgBlock
logger.Info(
"Blockchain reorg detected, resuming from block",
zap.Uint64("fromBlock", fromBlock),
)
default:
logs, nextBlock, err := r.getNextPage(watcher, fromBlock)
if err != nil {
Expand Down Expand Up @@ -157,7 +173,7 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) {
time.Sleep(NO_LOGS_SLEEP_TIME)
}
for _, log := range logs {
watcher.channel <- log
watcher.eventChannel <- log
}
if nextBlock != nil {
fromBlock = *nextBlock
Expand All @@ -182,7 +198,7 @@ func (r *RpcLogStreamer) getNextPage(
r.logger.Debug("Chain is up to date. Skipping update")
return []types.Log{}, nil, nil
}
numOfBlocksToProcess := highestBlockCanProcess - fromBlock + 1
numOfBlocksToProcess := (highestBlockCanProcess - fromBlock) + 1

var to uint64
// Make sure we stay within a reasonable page size
Expand Down Expand Up @@ -211,6 +227,10 @@ func (r *RpcLogStreamer) getNextPage(
return logs, &nextBlockNumber, nil
}

func (r *RpcLogStreamer) Client() ChainClient {
return r.client
}

func buildFilterQuery(
contractConfig contractConfig,
fromBlock int64,
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockchain/rpcLogStreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func buildStreamer(
fromBlock: fromBlock,
contractAddress: address,
topics: []common.Hash{topic},
channel: channel,
eventChannel: channel,
}
return NewRpcLogStreamer(context.Background(), client, log, []contractConfig{cfg}), channel
}
Expand All @@ -41,7 +41,7 @@ func TestBuilder(t *testing.T) {
require.NoError(t, err)
builder := NewRpcLogStreamBuilder(context.Background(), testclient, testutils.NewLog(t))

listenerChannel := builder.ListenForContractEvent(
listenerChannel, _ := builder.ListenForContractEvent(
1,
testutils.RandomAddress(),
[]common.Hash{testutils.RandomLogTopic()}, 5*time.Minute,
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestRpcLogStreamer(t *testing.T) {
fromBlock: fromBlock,
contractAddress: address,
topics: []common.Hash{topic},
channel: make(chan types.Log),
eventChannel: make(chan types.Log),
}

logs, nextPage, err := streamer.getNextPage(cfg, fromBlock)
Expand Down
32 changes: 32 additions & 0 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,35 @@ FROM
WHERE
contract_address = @contract_address;

-- name: InsertBlockchainMessage :exec
INSERT INTO blockchain_messages(block_number, block_hash, originator_node_id, originator_sequence_id, is_canonical)
VALUES (@block_number, @block_hash, @originator_node_id, @originator_sequence_id, @is_canonical)
ON CONFLICT
DO NOTHING;

-- name: GetBlocksInRange :many
-- Returns blocks in ascending order (oldest to newest)
-- StartBlock should be the lower bound (older block)
-- EndBlock should be the upper bound (newer block)
-- Example: GetBlocksInRange(1000, 2000), returns 1000, 1001, 1002, ..., 2000
SELECT DISTINCT ON (block_number)
block_number,
block_hash
FROM
blockchain_messages
WHERE
block_number BETWEEN @start_block AND @end_block
AND block_hash IS NOT NULL
AND is_canonical = TRUE
ORDER BY
block_number ASC,
block_hash;

-- name: UpdateBlocksCanonicalityInRange :exec
UPDATE
blockchain_messages
SET
is_canonical = FALSE
WHERE
block_number >= @reorg_block_number;
Comment on lines +149 to +155
Copy link

@coderabbitai coderabbitai bot Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Set transaction isolation level for canonicality updates.

The UpdateBlocksCanonicalityInRange query should run with SERIALIZABLE isolation to prevent race conditions during reorganizations.

Add this comment before the query:

-- Set transaction isolation level to SERIALIZABLE to prevent race conditions
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fbac is bunny has the right intuition here. Are we holding a lock somewhere to prevent the insertion of new canonical blocks?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!


2 changes: 1 addition & 1 deletion pkg/db/queries/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 93 additions & 1 deletion pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading