Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: detect chain reorgs #411

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/blockchain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ChainClient interface {
ethereum.BlockNumberReader
ethereum.LogFilterer
ethereum.ChainIDReader
ethereum.ChainReader
}

type TransactionSigner interface {
Expand Down
33 changes: 27 additions & 6 deletions pkg/blockchain/rpcLogStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,21 @@ func (c *RpcLogStreamBuilder) ListenForContractEvent(
contractAddress common.Address,
topics []common.Hash,
maxDisconnectTime time.Duration,
) <-chan types.Log {
) (<-chan types.Log, chan<- uint64) {
eventChannel := make(chan types.Log, 100)
reorgChannel := make(chan uint64, 1)
c.contractConfigs = append(
c.contractConfigs,
contractConfig{fromBlock, contractAddress, topics, eventChannel, maxDisconnectTime},
contractConfig{
fromBlock,
contractAddress,
topics,
eventChannel,
reorgChannel,
maxDisconnectTime,
},
)
return eventChannel
return eventChannel, reorgChannel
fbac marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *RpcLogStreamBuilder) Build() (*RpcLogStreamer, error) {
Expand All @@ -66,7 +74,8 @@ type contractConfig struct {
fromBlock uint64
contractAddress common.Address
topics []common.Hash
channel chan<- types.Log
eventChannel chan<- types.Log
reorgChannel chan uint64
maxDisconnectTime time.Duration
}

Expand Down Expand Up @@ -119,12 +128,20 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) {
fromBlock := watcher.fromBlock
logger := r.logger.With(zap.String("contractAddress", watcher.contractAddress.Hex()))
startTime := time.Now()
defer close(watcher.channel)
defer close(watcher.eventChannel)
defer close(watcher.reorgChannel)
fbac marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-r.ctx.Done():
logger.Debug("Stopping watcher")
return
case reorgBlock := <-watcher.reorgChannel:
// TODO: Implement reorg handling
// fromBlock = reorgBlock
logger.Info(
"Reorganization handling initiated from block",
zap.Uint64("block", reorgBlock),
)
Comment on lines +139 to +144
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement Reorganization Handling Logic

The TODO comment indicates that reorganization handling is not yet implemented. Proper handling of blockchain reorganizations is crucial to maintain data consistency and integrity. Neglecting this can lead to missed events or processing invalid data.

Would you like assistance in implementing the reorganization handling logic? I can help outline the steps or provide code to handle reorgs effectively.

default:
logs, nextBlock, err := r.getNextPage(watcher, fromBlock)
if err != nil {
Expand Down Expand Up @@ -157,7 +174,7 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) {
time.Sleep(NO_LOGS_SLEEP_TIME)
}
for _, log := range logs {
watcher.channel <- log
watcher.eventChannel <- log
}
if nextBlock != nil {
fromBlock = *nextBlock
Expand Down Expand Up @@ -211,6 +228,10 @@ func (r *RpcLogStreamer) getNextPage(
return logs, &nextBlockNumber, nil
}

func (r *RpcLogStreamer) Client() ChainClient {
return r.client
}

func buildFilterQuery(
contractConfig contractConfig,
fromBlock int64,
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockchain/rpcLogStreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func buildStreamer(
fromBlock: fromBlock,
contractAddress: address,
topics: []common.Hash{topic},
channel: channel,
eventChannel: channel,
}
return NewRpcLogStreamer(context.Background(), client, log, []contractConfig{cfg}), channel
}
Expand All @@ -41,7 +41,7 @@ func TestBuilder(t *testing.T) {
require.NoError(t, err)
builder := NewRpcLogStreamBuilder(context.Background(), testclient, testutils.NewLog(t))

listenerChannel := builder.ListenForContractEvent(
listenerChannel, _ := builder.ListenForContractEvent(
1,
testutils.RandomAddress(),
[]common.Hash{testutils.RandomLogTopic()}, 5*time.Minute,
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestRpcLogStreamer(t *testing.T) {
fromBlock: fromBlock,
contractAddress: address,
topics: []common.Hash{topic},
channel: make(chan types.Log),
eventChannel: make(chan types.Log),
}

logs, nextPage, err := streamer.getNextPage(cfg, fromBlock)
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type ContractsOptions struct {
ChainID int `long:"chain-id" env:"XMTPD_CONTRACTS_CHAIN_ID" description:"Chain ID for the appchain" default:"31337"`
RefreshInterval time.Duration `long:"refresh-interval" env:"XMTPD_CONTRACTS_REFRESH_INTERVAL" description:"Refresh interval for the nodes registry" default:"60s"`
MaxChainDisconnectTime time.Duration `long:"max-chain-disconnect-time" env:"XMTPD_CONTRACTS_MAX_CHAIN_DISCONNECT_TIME" description:"Maximum time to allow the node to operate while disconnected" default:"300s"`
// TODO: Calculate the blocks safe distance in the L3 or risk tolerance we assume
SafeBlockDistance uint64 `long:"safe-block-distance" env:"XMTPD_CONTRACTS_SAFE_BLOCK_DISTANCE" description:"Safe block distance" default:"0"`
}

type DbOptions struct {
Expand Down
24 changes: 22 additions & 2 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ WHERE
singleton_id = 1;

-- name: InsertGatewayEnvelope :execrows
INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope)
VALUES (@originator_node_id, @originator_sequence_id, @topic, @originator_envelope)
INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope, block_number, block_hash, version, is_canonical)
VALUES (@originator_node_id, @originator_sequence_id, @topic, @originator_envelope, @block_number, @block_hash, @version, @is_canonical)
ON CONFLICT
DO NOTHING;
fbac marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -122,3 +122,23 @@ FROM
WHERE
contract_address = @contract_address;

-- name: GetEnvelopeVersion :one
SELECT
version
FROM
gateway_envelopes
WHERE
originator_node_id = $1
AND originator_sequence_id = $2
AND is_canonical = TRUE;

-- name: InvalidateEnvelope :exec
UPDATE
gateway_envelopes
SET
is_canonical = FALSE
WHERE
originator_node_id = $1
AND originator_sequence_id = $2
AND is_canonical = TRUE;

4 changes: 4 additions & 0 deletions pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 59 additions & 3 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading