Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: detect chain reorgs #411

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

feat: detect chain reorgs #411

wants to merge 6 commits into from

Conversation

fbac
Copy link
Collaborator

@fbac fbac commented Jan 15, 2025

Summary by CodeRabbit

Based on the comprehensive changes, here are the updated release notes:

  • New Features

    • Added support for blockchain event reorganization detection.
    • Enhanced log streaming with version tracking and canonical event management.
    • Introduced new methods for envelope version retrieval and invalidation.
  • Improvements

    • Updated database schema to track blockchain-specific metadata.
    • Improved indexer's ability to handle complex blockchain event scenarios.
  • Database Changes

    • Added new columns for block number, block hash, version, and canonical status.
    • Introduced new database queries for envelope version management.
  • Performance

    • Created optimized database index for blockchain event queries.

These changes enhance the system's resilience and accuracy in tracking blockchain events, particularly during potential blockchain reorganizations.

Copy link

coderabbitai bot commented Jan 15, 2025

Walkthrough

This pull request introduces comprehensive changes to handle blockchain reorganization events across multiple components. The modifications span database schema updates, indexer logic, and log streaming mechanisms. Key enhancements include adding new channels for reorganization detection, updating database models to track block information, and implementing version management for gateway envelopes. The changes aim to improve the system's resilience to blockchain state changes by introducing mechanisms to detect and potentially handle reorganization scenarios.

Changes

File Change Summary
pkg/blockchain/rpcLogStreamer.go Added reorg channel to RpcLogStreamBuilder and contractConfig structures; modified ListenForContractEvent method signature.
pkg/blockchain/rpcLogStreamer_test.go Renamed channel to eventChannel in test configurations; updated method calls to reflect changes.
pkg/db/queries.sql Updated INSERT INTO gateway_envelopes with new columns and added new SQL statements for envelope version and canonicity.
pkg/db/queries/models.go Added BlockNumber, BlockHash, Version, and IsCanonical fields to GatewayEnvelope struct.
pkg/db/queries/queries.sql.go Added methods for getting envelope version and invalidating envelopes.
pkg/indexer/indexer.go Added reorg channels; updated indexLogs and StartIndexer methods to handle reorganization detection.
pkg/indexer/storer/groupMessage.go Updated StoreLog method to support appending logs and tracking block information.
pkg/blockchain/interface.go Added ChainReader interface to ChainClient.
pkg/config/options.go Added SafeBlockDistance field to ContractsOptions.
pkg/indexer/storer/identityUpdate.go Updated StoreLog method to support log appending.
pkg/indexer/storer/interface.go Modified StoreLog method signature to include appendLog parameter.
pkg/indexer/storer/logAppend.go Added GetVersionForAppend function for managing log versioning.
pkg/migrations/* Added SQL migrations for adding blockchain-related columns to gateway_envelopes table.

Sequence Diagram

sequenceDiagram
    participant Indexer
    participant LogStreamer
    participant BlockchainClient
    participant Database

    Indexer->>LogStreamer: Configure log stream
    LogStreamer-->>Indexer: Return event and reorg channels
    Indexer->>BlockchainClient: Start indexing logs
    BlockchainClient->>Indexer: Stream log events
    Indexer->>Database: Store log events
    BlockchainClient->>Indexer: Detect potential reorganization
    Indexer->>Database: Invalidate previous entries
    Indexer->>LogStreamer: Send reorg notification
Loading

Possibly related PRs

  • feat: make blocktracker aware of block hashes #400: The changes in this PR enhance the blockTracker to handle block hashes, which is directly related to the modifications in the main PR that also involve handling block hashes in the context of reorganizations and event tracking.

Suggested reviewers

  • neekolas

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7f8e846 and 743cfa8.

📒 Files selected for processing (2)
  • pkg/blockchain/rpcLogStreamer.go (5 hunks)
  • pkg/indexer/indexer.go (9 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/blockchain/rpcLogStreamer.go
  • pkg/indexer/indexer.go

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (2)
pkg/indexer/indexer.go (1)

225-236: Potential Performance Impact Due to Frequent Block Fetching

Fetching the block using client.BlockByNumber for every event can significantly impact performance, especially when processing a high volume of events.

Consider the following optimization:

  • Batch Processing: Fetch block hashes in batches or at set intervals instead of per event.
  • Caching: Implement a caching mechanism to store recently fetched block hashes.

Example code snippet for caching:

var blockHashCache = make(map[uint64][]byte)
// Before fetching the block
if hash, exists := blockHashCache[storedBlockNumber]; exists {
    // Use the cached hash
    latestBlockHash = hash
} else {
    // Fetch and cache the block hash
    block, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber)))
    if err != nil {
        // Handle error
    }
    latestBlockHash = block.Hash().Bytes()
    blockHashCache[storedBlockNumber] = latestBlockHash
}
pkg/migrations/00004_update-gateway-envelopes.sql (1)

1-5: Add indexes for chain reorganization queries.

Consider adding indexes on (block_number, block_hash) and is_canonical to optimize queries during chain reorganization handling.

CREATE INDEX idx_gateway_envelopes_block ON gateway_envelopes(block_number, block_hash);
CREATE INDEX idx_gateway_envelopes_canonical ON gateway_envelopes(is_canonical);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7523063 and a36a4b6.

📒 Files selected for processing (8)
  • pkg/blockchain/rpcLogStreamer.go (4 hunks)
  • pkg/blockchain/rpcLogStreamer_test.go (2 hunks)
  • pkg/db/queries.sql (1 hunks)
  • pkg/db/queries/models.go (1 hunks)
  • pkg/db/queries/queries.sql.go (5 hunks)
  • pkg/indexer/indexer.go (8 hunks)
  • pkg/indexer/storer/groupMessage.go (1 hunks)
  • pkg/migrations/00004_update-gateway-envelopes.sql (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • pkg/blockchain/rpcLogStreamer_test.go
🔇 Additional comments (2)
pkg/db/queries/models.go (1)

25-28: Ensure Consistency of New Fields in GatewayEnvelope Struct

The addition of BlockNumber, BlockHash, Version, and IsCanonical fields to the GatewayEnvelope struct is appropriate for tracking blockchain event metadata. Ensure that these fields are consistently populated throughout the application and properly handled in database operations.

Please verify that:

  • The database migrations have been applied to include the new columns.
  • The fields are correctly set when inserting new GatewayEnvelope records.
  • Any queries or usages of GatewayEnvelope are updated to accommodate the new fields.
pkg/db/queries/queries.sql.go (1)

142-143: LGTM! Generated code matches schema changes.

The generated code correctly reflects the database schema changes. No manual modifications needed as this is an auto-generated file.

Also applies to: 153-156, 165-168, 246-246, 280-283

Comment on lines +139 to +144
// TODO: Implement reorg handling
// fromBlock = reorgBlock
logger.Info(
"Reorganization handling initiated from block",
zap.Uint64("block", reorgBlock),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement Reorganization Handling Logic

The TODO comment indicates that reorganization handling is not yet implemented. Proper handling of blockchain reorganizations is crucial to maintain data consistency and integrity. Neglecting this can lead to missed events or processing invalid data.

Would you like assistance in implementing the reorganization handling logic? I can help outline the steps or provide code to handle reorgs effectively.

pkg/blockchain/rpcLogStreamer.go Outdated Show resolved Hide resolved
pkg/blockchain/rpcLogStreamer.go Show resolved Hide resolved
pkg/indexer/indexer.go Outdated Show resolved Hide resolved
pkg/migrations/00004_update-gateway-envelopes.sql Outdated Show resolved Hide resolved
pkg/db/queries.sql Show resolved Hide resolved
pkg/indexer/storer/groupMessage.go Outdated Show resolved Hide resolved
@fbac fbac force-pushed the feat/reorg-detection branch 2 times, most recently from 4a96e4f to a99d582 Compare January 16, 2025 13:54
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.

Actionable comments posted: 5

🔭 Outside diff range comments (2)
pkg/indexer/storer/groupMessage_test.go (1)

Line range hint 1-143: Add dedicated test cases for chain reorganization scenarios.

The test suite should include dedicated test cases to verify the system's behavior during chain reorganizations. Consider adding these test cases:

  1. Chain Reorg Detection

    • Test detection of non-canonical blocks
    • Test handling of orphaned blocks
    • Test recovery from deep reorgs
  2. Edge Cases

    • Test concurrent reorgs
    • Test invalid block sequences
    • Test missing block information

Would you like me to provide example implementations for these test cases?

pkg/db/queries/queries.sql.go (1)

Line range hint 165-191: Reconsider the conflict resolution strategy.

The current ON CONFLICT DO NOTHING strategy might not be optimal for handling chain reorganizations. Consider using ON CONFLICT DO UPDATE to update the block information, version, and canonical status when conflicts occur, ensuring we don't silently ignore potential reorg updates.

Example modification:

-ON CONFLICT
-	DO NOTHING
+ON CONFLICT (originator_node_id, originator_sequence_id)
+	DO UPDATE SET
+		block_number = EXCLUDED.block_number,
+		block_hash = EXCLUDED.block_hash,
+		version = EXCLUDED.version,
+		is_canonical = EXCLUDED.is_canonical
+	WHERE gateway_envelopes.version < EXCLUDED.version
♻️ Duplicate comments (1)
pkg/indexer/indexer.go (1)

224-259: ⚠️ Potential issue

Critical: Incomplete reorg detection and handling

The current implementation has several critical issues:

  1. The safe block distance check event.BlockNumber-storedBlockNumber < safeBlockDistance might miss reorgs that happen within the safe distance.
  2. The reorg channel send is commented out, leaving the reorg handling incomplete.
  3. The TODO items outline critical steps that need to be implemented.

Consider implementing the following changes:

 if !bytes.Equal(storedBlockHash, latestBlock.Hash().Bytes()) {
     logger.Warn("blockchain reorg detected",
         zap.Uint64("storedBlockNumber", storedBlockNumber),
         zap.String("storedBlockHash", hex.EncodeToString(storedBlockHash)),
         zap.String("onchainBlockHash", latestBlock.Hash().String()),
     )

-    // TODO: Implement reorg handling:
-    // reorgChannel <- event.BlockNumber
+    // Send the reorg event
+    select {
+    case reorgChannel <- storedBlockNumber:
+        logger.Info("sent reorg event", 
+            zap.Uint64("blockNumber", storedBlockNumber))
+    default:
+        logger.Warn("reorg channel full, skipping reorg event",
+            zap.Uint64("blockNumber", storedBlockNumber))
+    }
 }
🧹 Nitpick comments (2)
pkg/db/queries/queries.sql.go (2)

243-262: Enhance error handling for envelope invalidation.

The function could benefit from:

  1. Returning the number of rows affected to verify if any envelope was actually invalidated
  2. Explicit handling of the case where no rows are updated (might indicate a race condition)
-func (q *Queries) InvalidateEnvelope(ctx context.Context, arg InvalidateEnvelopeParams) error {
-	_, err := q.db.ExecContext(ctx, invalidateEnvelope, arg.OriginatorNodeID, arg.OriginatorSequenceID)
-	return err
+func (q *Queries) InvalidateEnvelope(ctx context.Context, arg InvalidateEnvelopeParams) (int64, error) {
+	result, err := q.db.ExecContext(ctx, invalidateEnvelope, arg.OriginatorNodeID, arg.OriginatorSequenceID)
+	if err != nil {
+		return 0, err
+	}
+	return result.RowsAffected()
+}

Line range hint 290-327: Consider additional filtering options for envelope selection.

The query could benefit from optional filters for:

  1. is_canonical status to filter out invalidated envelopes
  2. block_number range to help with reorg analysis
  3. version to track envelope history

This would require updating the stored procedure select_gateway_envelopes to accept these additional parameters.

🛑 Comments failed to post (5)
pkg/indexer/storer/groupMessage_test.go (2)

62-64: 🛠️ Refactor suggestion

Add test coverage for chain reorganization fields.

The test should be updated to verify the new fields (block_number, block_hash, version, is_canonical) that were added to handle chain reorganizations. Currently, it only validates the envelope content and sequence ID.

Add assertions to verify these fields:

 require.NoError(t, err)
+require.NotNil(t, firstEnvelope.BlockNumber, "Block number should be set")
+require.NotEmpty(t, firstEnvelope.BlockHash, "Block hash should be set")
+require.Equal(t, int32(1), firstEnvelope.Version, "Initial version should be 1")
+require.True(t, firstEnvelope.IsCanonical, "Message should be marked as canonical")

Committable suggestion skipped: line range outside the PR's diff.


111-113: 🛠️ Refactor suggestion

Enhance duplicate message test for chain reorganization scenarios.

The duplicate message test should be expanded to verify the version increment and canonicality updates when the same message is received from different blocks (chain reorg scenario).

Consider adding these test cases:

 require.NoError(t, queryErr)
 require.Equal(t, len(envelopes), 1)
+
+// Verify version and canonicality handling
+envelope := envelopes[0]
+require.Equal(t, int32(2), envelope.Version, "Version should be incremented for duplicate")
+require.True(t, envelope.IsCanonical, "Message should remain canonical")
+
+// Test reorg scenario with different block
+reorgLog := logMessage
+reorgLog.BlockNumber += 1
+reorgLog.BlockHash = common.HexToHash("0xdifferent")
+err = storer.StoreLog(ctx, reorgLog)
+require.NoError(t, err)
+
+reorgEnvelopes, err := storer.db.SelectGatewayEnvelopes(ctx, queries.SelectGatewayEnvelopesParams{
+    OriginatorNodeIds: []int32{0},
+})
+require.NoError(t, err)
+require.Equal(t, 1, len(reorgEnvelopes), "Should still have one envelope")
+
+reorgEnvelope := reorgEnvelopes[0]
+require.Equal(t, int32(3), reorgEnvelope.Version, "Version should be incremented")
+require.Equal(t, reorgLog.BlockNumber, reorgEnvelope.BlockNumber, "Block number should be updated")
+require.Equal(t, reorgLog.BlockHash.Hex(), reorgEnvelope.BlockHash, "Block hash should be updated")

Committable suggestion skipped: line range outside the PR's diff.

pkg/indexer/storer/logAppend.go (1)

12-49: ⚠️ Potential issue

Wrap database operations in a transaction for atomicity.

The function performs multiple database operations (get version and invalidate envelope) that should be atomic to prevent race conditions and ensure data consistency during chain reorganizations.

Consider wrapping the operations in a transaction:

 func GetVersionForAppend(
 	ctx context.Context,
 	querier *queries.Queries,
 	logger *zap.Logger,
 	originatorNodeID int32,
 	sequenceID int64,
 ) (sql.NullInt32, error) {
 	var version sql.NullInt32
+
+	// Start transaction
+	tx, err := querier.Begin(ctx)
+	if err != nil {
+		logger.Error("Error starting transaction", zap.Error(err))
+		return version, err
+	}
+	defer tx.Rollback()
+
+	qtx := querier.WithTx(tx)
-	currentVersion, err := querier.GetEnvelopeVersion(ctx, queries.GetEnvelopeVersionParams{
+	currentVersion, err := qtx.GetEnvelopeVersion(ctx, queries.GetEnvelopeVersionParams{
 		OriginatorNodeID:     originatorNodeID,
 		OriginatorSequenceID: sequenceID,
 	})
 	if err != nil && !errors.Is(err, sql.ErrNoRows) {
 		logger.Error("Error getting current version", zap.Error(err))
 		return version, err
 	}
 
 	if errors.Is(err, sql.ErrNoRows) {
 		return version, err
 	}
 
 	if err == nil {
-		if err = querier.InvalidateEnvelope(ctx, queries.InvalidateEnvelopeParams{
+		if err = qtx.InvalidateEnvelope(ctx, queries.InvalidateEnvelopeParams{
 			OriginatorNodeID:     originatorNodeID,
 			OriginatorSequenceID: sequenceID,
 		}); err != nil {
 			logger.Error("Error invalidating old envelope", zap.Error(err))
 			return version, err
 		}
 
 		version = sql.NullInt32{
 			Int32: currentVersion.Int32 + 1,
 			Valid: true,
 		}
 	}
 
+	if err = tx.Commit(); err != nil {
+		logger.Error("Error committing transaction", zap.Error(err))
+		return version, err
+	}
+
 	return version, nil
 }

Committable suggestion skipped: line range outside the PR's diff.

pkg/indexer/storer/identityUpdate.go (2)

174-195: 🛠️ Refactor suggestion

Simplify error handling logic.

The error handling has redundant checks and suboptimal log placement. Consider simplifying:

 version := sql.NullInt32{Int32: 1, Valid: true}

 if appendLog {
     version, err = GetVersionForAppend(
         ctx,
         querier,
         s.logger,
         IDENTITY_UPDATE_ORIGINATOR_ID,
         int64(msgSent.SequenceId),
     )
     if err != nil {
-        if !errors.Is(err, sql.ErrNoRows) {
-            return NewLogStorageError(err, true)
-        }
-        if errors.Is(err, sql.ErrNoRows) {
-            s.logger.Debug("No rows found for envelope, inserting new",
-                zap.Int("originator_node_id", IDENTITY_UPDATE_ORIGINATOR_ID),
-                zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)),
-            )
-        }
+        if errors.Is(err, sql.ErrNoRows) {
+            s.logger.Debug("No rows found for envelope, inserting new",
+                zap.Int("originator_node_id", IDENTITY_UPDATE_ORIGINATOR_ID),
+                zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)),
+            )
+        } else {
+            return NewLogStorageError(err, true)
+        }
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

			version := sql.NullInt32{Int32: 1, Valid: true}

			if appendLog {
				version, err = GetVersionForAppend(
					ctx,
					querier,
					s.logger,
					IDENTITY_UPDATE_ORIGINATOR_ID,
					int64(msgSent.SequenceId),
				)
				if err != nil {
					if errors.Is(err, sql.ErrNoRows) {
						s.logger.Debug("No rows found for envelope, inserting new",
							zap.Int("originator_node_id", IDENTITY_UPDATE_ORIGINATOR_ID),
							zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)),
						)
					} else {
						return NewLogStorageError(err, true)
					}
				}
			}

52-56: 💡 Codebase verification

Breaking change in StoreLog requires updates in multiple files

The following files are using outdated 2-parameter version of StoreLog and need to be updated with the new appendLog parameter:

  • pkg/mocks/storer/mock_LogStorer.go
  • pkg/indexer/indexer_test.go
  • pkg/indexer/storer/groupMessage.go
  • pkg/indexer/storer/identityUpdate_test.go
  • pkg/indexer/storer/groupMessage_test.go
🔗 Analysis chain

Verify all callers of StoreLog are updated.

The method signature has been updated to include the appendLog parameter. This is a breaking change that requires updates to all callers.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all calls to StoreLog to ensure they've been updated
rg -A 2 "StoreLog\(" --type go

Length of output: 2632

@fbac fbac force-pushed the feat/reorg-detection branch from a99d582 to 7e6a14e Compare January 16, 2025 13:56
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
pkg/indexer/storer/identityUpdate.go (2)

27-27: Consider making IDENTITY_UPDATE_ORIGINATOR_ID configurable.

The constant IDENTITY_UPDATE_ORIGINATOR_ID is hardcoded to 1, and the comment suggests that this may need to vary for different smart contracts. To enhance flexibility and maintainability, consider externalizing this value to a configuration file or environment variable.


175-196: Simplify error handling logic for clarity.

The current error handling for sql.ErrNoRows involves a redundant if check. Simplifying this logic improves readability and maintainability.

Apply this diff to simplify the error handling:

 if err != nil {
-    if !errors.Is(err, sql.ErrNoRows) {
+    if errors.Is(err, sql.ErrNoRows) {
+        s.logger.Debug("No rows found for envelope, inserting new",
+            zap.Int("originator_node_id", IDENTITY_UPDATE_ORIGINATOR_ID),
+            zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)),
+        )
+    } else {
         return NewLogStorageError(err, true)
     }
-    if errors.Is(err, sql.ErrNoRows) {
-        s.logger.Debug("No rows found for envelope, inserting new",
-            zap.Int("originator_node_id", IDENTITY_UPDATE_ORIGINATOR_ID),
-            zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)),
-        )
-    }
 }
pkg/indexer/indexer_test.go (1)

74-81: Add test cases for retry-reorg interaction.

The test verifies retryable error handling but doesn't test how retries interact with chain reorganizations. Consider adding test cases that:

  1. Simulate a reorg during a retry
  2. Verify retry behavior when a reorg occurs
  3. Test error handling during reorg processing

Example test case:

func TestIndexLogsRetryDuringReorg(t *testing.T) {
    // Setup channels and context...
    
    // Simulate error and reorg
    logStorer.EXPECT().
        StoreLog(mock.Anything, event, false).
        RunAndReturn(func(ctx context.Context, log types.Log, isCanonical bool) storer.LogStorageError {
            // Trigger reorg during retry
            reorgChannel <- uint64(5)
            return storer.NewLogStorageError(errors.New("retryable error"), true)
        })
    
    // Verify handling...
}

Also applies to: 94-123

pkg/indexer/storer/groupMessage.go (1)

17-20: Consider making the originator ID configurable.

The TODO comment suggests not hardcoding the originator ID. Consider:

  1. Making it configurable via constructor
  2. Using a registry pattern for contract-specific IDs

Example:

type Config struct {
    OriginatorID int
}

func NewGroupMessageStorer(
    queries *queries.Queries,
    logger *zap.Logger,
    contract *groupmessages.GroupMessages,
    config Config,
) *GroupMessageStorer {
    return &GroupMessageStorer{
        queries:      queries,
        logger:       logger.Named("GroupMessageStorer"),
        contract:     contract,
        originatorID: config.OriginatorID,
    }
}
pkg/db/queries/queries.sql.go (1)

243-262: Consider returning affected rows count.

While the implementation is correct, returning the number of affected rows would help verify whether the invalidation actually occurred.

-func (q *Queries) InvalidateEnvelope(ctx context.Context, arg InvalidateEnvelopeParams) error {
-    _, err := q.db.ExecContext(ctx, invalidateEnvelope, arg.OriginatorNodeID, arg.OriginatorSequenceID)
-    return err
+func (q *Queries) InvalidateEnvelope(ctx context.Context, arg InvalidateEnvelopeParams) (int64, error) {
+    result, err := q.db.ExecContext(ctx, invalidateEnvelope, arg.OriginatorNodeID, arg.OriginatorSequenceID)
+    if err != nil {
+        return 0, err
+    }
+    return result.RowsAffected()
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 33ba92c and 7f8e846.

⛔ Files ignored due to path filters (35)
  • pkg/proto/identity/api/v1/identity.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/identity/associations/association.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/identity/associations/signature.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/identity/credential.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/keystore_api/v1/keystore.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_api/v1/authn.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_api/v1/message_api.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/ciphertext.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/composite.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/contact.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/content.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/conversation_reference.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/ecies.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/frames.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/invitation.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/message.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/private_key.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/private_preferences.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/public_key.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/signature.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/message_contents/signed_payload.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls/api/v1/mls.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls/database/intents.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls/message_contents/content.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls/message_contents/content_types/reaction.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls/message_contents/group_membership.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls/message_contents/group_metadata.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls/message_contents/group_mutable_metadata.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls/message_contents/group_permissions.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls/message_contents/transcript_messages.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/mls_validation/v1/service.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/xmtpv4/envelopes/envelopes.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/xmtpv4/message_api/message_api.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/xmtpv4/message_api/misbehavior_api.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
  • pkg/proto/xmtpv4/payer_api/payer_api.pb.go is excluded by !**/*.pb.go, !pkg/proto/**
📒 Files selected for processing (21)
  • pkg/blockchain/interface.go (1 hunks)
  • pkg/blockchain/rpcLogStreamer.go (5 hunks)
  • pkg/blockchain/rpcLogStreamer_test.go (3 hunks)
  • pkg/config/options.go (1 hunks)
  • pkg/db/queries.sql (2 hunks)
  • pkg/db/queries/models.go (1 hunks)
  • pkg/db/queries/queries.sql.go (7 hunks)
  • pkg/indexer/indexer.go (8 hunks)
  • pkg/indexer/indexer_test.go (1 hunks)
  • pkg/indexer/storer/groupMessage.go (4 hunks)
  • pkg/indexer/storer/groupMessage_test.go (3 hunks)
  • pkg/indexer/storer/identityUpdate.go (4 hunks)
  • pkg/indexer/storer/identityUpdate_test.go (1 hunks)
  • pkg/indexer/storer/interface.go (1 hunks)
  • pkg/indexer/storer/logAppend.go (1 hunks)
  • pkg/interceptors/server/auth_test.go (4 hunks)
  • pkg/migrations/00004_add_blockchain_columns.down.sql (1 hunks)
  • pkg/migrations/00004_add_blockchain_columns.up.sql (1 hunks)
  • pkg/mocks/authn/mock_JWTVerifier.go (2 hunks)
  • pkg/mocks/blockchain/mock_ChainClient.go (4 hunks)
  • pkg/mocks/storer/mock_LogStorer.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (10)
  • pkg/blockchain/interface.go
  • pkg/indexer/storer/interface.go
  • pkg/config/options.go
  • pkg/db/queries/models.go
  • pkg/blockchain/rpcLogStreamer_test.go
  • pkg/migrations/00004_add_blockchain_columns.down.sql
  • pkg/migrations/00004_add_blockchain_columns.up.sql
  • pkg/indexer/storer/logAppend.go
  • pkg/indexer/storer/groupMessage_test.go
  • pkg/blockchain/rpcLogStreamer.go
🔇 Additional comments (25)
pkg/indexer/storer/identityUpdate.go (1)

53-57: Ensure all calls to StoreLog are updated with the new appendLog parameter.

The StoreLog method signature now includes an additional appendLog parameter. Please verify that all implementations and usages of this method have been updated accordingly to prevent potential runtime errors.

Run the following script to identify all calls to StoreLog that may need updating:

✅ Verification successful

All StoreLog calls are properly updated with the new parameter

All implementations and usages of StoreLog have been verified to include the new appendLog parameter, with consistent usage of false as the default value. No outdated calls were found.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all calls to StoreLog and check for the correct number of arguments.

rg 'StoreLog\(' -t go -A 1

Length of output: 2043

pkg/mocks/storer/mock_LogStorer.go (4)

27-37: Update mock StoreLog method to reflect new parameter.

The StoreLog method in MockLogStorer has been updated to include the appendLog parameter, ensuring consistency with the interface. Verify that all mock expectations and return values are appropriately adjusted.


55-57: Ensure test expectations include appendLog parameter.

The StoreLog expectation in the mock now includes the appendLog parameter. Please verify that all tests setting expectations on StoreLog provide the correct arguments.


60-62: Pass appendLog parameter in mock Run function.

The Run method now correctly passes the appendLog parameter to the provided function, ensuring accurate simulation of method behavior in tests.


72-72: Include appendLog in RunAndReturn for completeness.

The RunAndReturn method is updated to accept the appendLog parameter, maintaining consistency in the mock's behavior.

pkg/indexer/storer/identityUpdate_test.go (1)

82-82: Update StoreLog call with appendLog parameter.

In TestStoreIdentityUpdate, the call to StoreLog now includes false for the appendLog parameter. Ensure that this aligns with the intended test scenario and that other tests are updated accordingly.

pkg/mocks/blockchain/mock_ChainClient.go (1)

32-89: LGTM! The mock implementation is complete and correct.

The auto-generated mock file correctly implements all the required blockchain client methods with proper:

  • Method signatures
  • Return value handling
  • Error cases
  • Helper methods for expectations

Also applies to: 91-148, 501-558, 560-615, 617-675

pkg/db/queries.sql (3)

16-19: Enhance conflict handling for chain reorganizations.

The current ON CONFLICT DO NOTHING might not handle chain reorganizations correctly. Consider updating the is_canonical flag when conflicts occur based on block metadata.


125-134: LGTM! The version retrieval query is well-designed.

The query correctly:

  • Retrieves the version for canonical envelopes only
  • Uses proper parameter binding

135-144: LGTM! The envelope invalidation query is well-designed.

The query correctly:

  • Updates only canonical envelopes
  • Uses proper parameter binding
  • Has appropriate WHERE clause conditions
pkg/indexer/storer/groupMessage.go (2)

88-89: Implement dynamic version and canonical status handling.

The version and canonical status are hardcoded. Consider:

  1. Implementing version based on protocol or message format version
  2. Determining canonical status based on block confirmation depth

Also applies to: 114-117


90-109: LGTM! The version retrieval logic is well-implemented.

The code correctly:

  • Handles version retrieval for appending
  • Properly handles the no-rows case
  • Includes appropriate logging
pkg/indexer/indexer.go (5)

138-138: LGTM! Well-structured channel declarations.

The send-only channels (chan<- uint64) are appropriately typed for their purpose of signaling reorganization events.

Also applies to: 140-140


162-162: LGTM! Proper initialization of reorg channels.

The reorg channels are correctly initialized and returned from the builder, maintaining consistency with the struct updates.

Also applies to: 184-184, 195-197


212-215: LGTM! Enhanced function signature for reorg handling.

The addition of blockchain client, reorg channel, and safe block distance parameters provides the necessary components for reorg detection and handling.


92-95: LGTM! Proper parameter passing for reorg handling.

The function correctly passes the blockchain client, reorg channels, and safe block distance to both message and identity update indexers.

Also applies to: 116-120


224-259: ⚠️ Potential issue

Complete the reorg handling implementation.

The reorg detection logic is well-structured, but several critical items need attention:

  1. The reorgChannel send is commented out, leaving the reorg handling incomplete
  2. The safeBlockDistance check could be more robust by also verifying the minimum safe distance
  3. The TODO comments outline important steps that need implementation

Let's verify the reorg detection logic:

Consider implementing the following improvements:

 if event.BlockNumber > storedBlockNumber &&
-    event.BlockNumber-storedBlockNumber < safeBlockDistance {
+    event.BlockNumber-storedBlockNumber < safeBlockDistance &&
+    event.BlockNumber-storedBlockNumber >= MIN_SAFE_DISTANCE {
     latestBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber)))
     // ... existing code ...
     if !bytes.Equal(storedBlockHash, latestBlock.Hash().Bytes()) {
         // ... existing code ...
-        // reorgChannel <- event.BlockNumber
+        reorgChannel <- event.BlockNumber
     }
 }
pkg/db/queries/queries.sql.go (3)

78-99: LGTM! Well-implemented version retrieval.

The query correctly filters for canonical envelopes and properly handles NULL versions using sql.NullInt32.


165-166: LGTM! Comprehensive envelope storage.

The changes properly track block information, versioning, and canonicity status, which are essential for reorg handling.

Also applies to: 176-179, 188-191


290-290: LGTM! Consistent envelope retrieval.

The SELECT clause and scan operations properly include all new fields, maintaining consistency with the updated struct.

Also applies to: 324-327

pkg/mocks/authn/mock_JWTVerifier.go (4)

28-37: LGTM: Proper handling of uint32 return value

The mock correctly handles the uint32 return value through type assertions and function calls.


39-45: LGTM: Error handling remains consistent

The error handling logic is maintained while accommodating the new return type structure.


66-74: LGTM: Mock helper methods updated correctly

The Return and RunAndReturn methods are properly updated to handle both uint32 and error return values.


Line range hint 28-74: Verify the semantic meaning of uint32 return value

The introduction of a uint32 return value suggests it carries semantic meaning (e.g., status code, identifier). This should be documented to ensure correct usage.

Let's check for any documentation or usage patterns:

✅ Verification successful

The uint32 return value represents the source node ID in the node-to-node authentication system

The uint32 return value from Verify() has clear semantic meaning: it represents the ID of the source node that created the token. This is used in the system's node-to-node authentication mechanism, where tokens are created by one node (the source) targeting another node (the audience), and the verification process returns the source node's ID upon successful validation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for documentation or usage patterns of the uint32 return value
rg -g '!vendor' -g '*.go' "type JWTVerifier interface" -A 5

Length of output: 205


Script:

#!/bin/bash
# Search for implementations and usage of JWTVerifier.Verify
echo "=== Searching for implementations ==="
ast-grep --pattern 'type $_ struct {
  $$$
}

func ($_ $_) Verify(tokenString string) (uint32, error) {
  $$$
}'

echo -e "\n=== Searching for method usage ==="
rg -g '!vendor' -g '*.go' -B 3 -A 3 '\.Verify\(' 

echo -e "\n=== Searching for related documentation ==="
rg -g '!vendor' -g '*.go' -B 3 "(?i)jwt.*verify"

Length of output: 9853


Script:

#!/bin/bash
# Search for VERIFIER_NODE_ID constant and related configuration
echo "=== Searching for node ID constants and configuration ==="
rg -g '!vendor' -g '*.go' "VERIFIER_NODE_ID|CreateToken|type.*Claims" -B 2 -A 2

echo -e "\n=== Searching for token creation implementation ==="
ast-grep --pattern 'func ($_ $_) CreateToken(nodeId uint32) ($_, error) {
  $$$
}'

Length of output: 8525

pkg/interceptors/server/auth_test.go (1)

39-39: Verify the interceptor's handling of uint32 values

The tests don't verify if the uint32 value is properly propagated through the interceptor chain.

Let's check how the interceptor uses this value:

Also applies to: 74-74, 134-134, 159-159

Comment on lines 22 to +29
channel := make(chan types.Log, 10)
defer close(channel)
reorgChannel := make(chan uint64, 1)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
close(channel)
close(reorgChannel)
}()
Copy link

@coderabbitai coderabbitai bot Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add test cases for chain reorganization handling.

The test setup includes a reorgChannel for handling chain reorganizations, but the test doesn't verify this functionality. Consider adding test cases that:

  1. Simulate a chain reorganization event
  2. Verify the system's response to reorgs
  3. Test different block distances

Example test case structure:

func TestIndexLogsWithReorg(t *testing.T) {
    channel := make(chan types.Log, 10)
    reorgChannel := make(chan uint64, 1)
    ctx, cancel := context.WithCancel(context.Background())
    defer func() {
        cancel()
        close(channel)
        close(reorgChannel)
    }()

    // Setup mocks...
    
    // Simulate reorg
    reorgChannel <- uint64(5) // Reorg at block 5
    
    // Verify handling...
}

Also applies to: 44-68

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be added in a future PR.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fbac Understood. Would you like me to create a GitHub issue to track the addition of these test cases? This will help ensure it's not overlooked in the future PR.

pkg/interceptors/server/auth_test.go Show resolved Hide resolved
@fbac fbac marked this pull request as ready for review January 16, 2025 15:30
@fbac fbac requested a review from a team as a code owner January 16, 2025 15:30
@fbac fbac changed the title feat: detect and handle chain reorgs feat: detect chain reorgs Jan 16, 2025
@fbac fbac force-pushed the feat/reorg-detection branch from cf48f8c to 743cfa8 Compare January 16, 2025 16:14

// TODO: Calculate the blocks safe distance in the L3 or risk tolerance we assume
if event.BlockNumber > storedBlockNumber &&
event.BlockNumber-storedBlockNumber < safeBlockDistance {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to balance timeliness with safety here. A lot of these blockchain events are the kind of thing a human is going to be waiting for on their phone. I think that means we need to be optimistic and rely on preconfirmations, and then fix later in the rare emergency case.

func (s *IdentityUpdateStorer) StoreLog(
ctx context.Context,
event types.Log,
appendLog bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a comment about when you would and wouldn't want to use this flag.

@@ -150,6 +173,10 @@ type InsertGatewayEnvelopeParams struct {
OriginatorSequenceID int64
Topic []byte
OriginatorEnvelope []byte
BlockNumber sql.NullInt64
BlockHash []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud, but what if we did this relationally (like with a table called block_versions) and joined to that rather than denormalizing?

Would use less storage and still be easy to find impacted messages in a re-org.

Not a suggestion, more of a question.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a similar proposal in slack :)

Copy link
Collaborator Author

@fbac fbac Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that approach, too. My biggest concern is that for each group/identity message we'd be inserting in db twice and there would be additional storage overhead.

Would use less storage

I'm not sure I follow, could you expand this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just that we only have to store the block hash and number once per block, and store a foreign key on each message in the block. The block number and foreign key are the same size so they cancel out, but the block hash is pure savings and bigger.

I'm assuming most blocks have > 1 message in them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants