From 30f89f1a5ab247aa400618ac7c36accbf26edcd1 Mon Sep 17 00:00:00 2001 From: Borja Aranda Date: Wed, 15 Jan 2025 14:16:25 +0100 Subject: [PATCH 1/6] feat: handle L3 reorgs --- pkg/blockchain/rpcLogStreamer.go | 28 ++++++-- pkg/blockchain/rpcLogStreamer_test.go | 4 +- pkg/db/queries.sql | 4 +- pkg/db/queries/models.go | 4 ++ pkg/db/queries/queries.sql.go | 18 ++++- pkg/indexer/indexer.go | 67 ++++++++++++++++--- pkg/indexer/storer/groupMessage.go | 4 ++ .../00004_update-gateway-envelopes.sql | 6 ++ 8 files changed, 114 insertions(+), 21 deletions(-) create mode 100644 pkg/migrations/00004_update-gateway-envelopes.sql diff --git a/pkg/blockchain/rpcLogStreamer.go b/pkg/blockchain/rpcLogStreamer.go index 2b58edb1..c13466d1 100644 --- a/pkg/blockchain/rpcLogStreamer.go +++ b/pkg/blockchain/rpcLogStreamer.go @@ -48,13 +48,21 @@ func (c *RpcLogStreamBuilder) ListenForContractEvent( contractAddress common.Address, topics []common.Hash, maxDisconnectTime time.Duration, -) <-chan types.Log { +) (<-chan types.Log, chan<- uint64) { eventChannel := make(chan types.Log, 100) + reorgChannel := make(chan uint64, 1) c.contractConfigs = append( c.contractConfigs, - contractConfig{fromBlock, contractAddress, topics, eventChannel, maxDisconnectTime}, + contractConfig{ + fromBlock, + contractAddress, + topics, + eventChannel, + reorgChannel, + maxDisconnectTime, + }, ) - return eventChannel + return eventChannel, reorgChannel } func (c *RpcLogStreamBuilder) Build() (*RpcLogStreamer, error) { @@ -66,7 +74,8 @@ type contractConfig struct { fromBlock uint64 contractAddress common.Address topics []common.Hash - channel chan<- types.Log + eventChannel chan<- types.Log + reorgChannel <-chan uint64 maxDisconnectTime time.Duration } @@ -119,12 +128,19 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) { fromBlock := watcher.fromBlock logger := r.logger.With(zap.String("contractAddress", watcher.contractAddress.Hex())) startTime := time.Now() - defer close(watcher.channel) + defer close(watcher.eventChannel) for { select { case <-r.ctx.Done(): logger.Debug("Stopping watcher") return + case reorgBlock := <-watcher.reorgChannel: + // TODO: Implement reorg handling + // fromBlock = reorgBlock + logger.Info( + "Reorganization handling initiated from block", + zap.Uint64("block", reorgBlock), + ) default: logs, nextBlock, err := r.getNextPage(watcher, fromBlock) if err != nil { @@ -157,7 +173,7 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) { time.Sleep(NO_LOGS_SLEEP_TIME) } for _, log := range logs { - watcher.channel <- log + watcher.eventChannel <- log } if nextBlock != nil { fromBlock = *nextBlock diff --git a/pkg/blockchain/rpcLogStreamer_test.go b/pkg/blockchain/rpcLogStreamer_test.go index 3990cc5b..24b9c947 100644 --- a/pkg/blockchain/rpcLogStreamer_test.go +++ b/pkg/blockchain/rpcLogStreamer_test.go @@ -31,7 +31,7 @@ func buildStreamer( fromBlock: fromBlock, contractAddress: address, topics: []common.Hash{topic}, - channel: channel, + eventChannel: channel, } return NewRpcLogStreamer(context.Background(), client, log, []contractConfig{cfg}), channel } @@ -79,7 +79,7 @@ func TestRpcLogStreamer(t *testing.T) { fromBlock: fromBlock, contractAddress: address, topics: []common.Hash{topic}, - channel: make(chan types.Log), + eventChannel: make(chan types.Log), } logs, nextPage, err := streamer.getNextPage(cfg, fromBlock) diff --git a/pkg/db/queries.sql b/pkg/db/queries.sql index 7d8c4d9b..c1a4d684 100644 --- a/pkg/db/queries.sql +++ b/pkg/db/queries.sql @@ -13,8 +13,8 @@ WHERE singleton_id = 1; -- name: InsertGatewayEnvelope :execrows -INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope) - VALUES (@originator_node_id, @originator_sequence_id, @topic, @originator_envelope) +INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope, block_number, block_hash, version, is_canonical) + VALUES (@originator_node_id, @originator_sequence_id, @topic, @originator_envelope, @block_number, @block_hash, @version, @is_canonical) ON CONFLICT DO NOTHING; diff --git a/pkg/db/queries/models.go b/pkg/db/queries/models.go index 6579fe5a..06f422cc 100644 --- a/pkg/db/queries/models.go +++ b/pkg/db/queries/models.go @@ -22,6 +22,10 @@ type GatewayEnvelope struct { OriginatorSequenceID int64 Topic []byte OriginatorEnvelope []byte + BlockNumber int64 + BlockHash []byte + Version int32 + IsCanonical bool } type LatestBlock struct { diff --git a/pkg/db/queries/queries.sql.go b/pkg/db/queries/queries.sql.go index a1f2e4b2..018c3154 100644 --- a/pkg/db/queries/queries.sql.go +++ b/pkg/db/queries/queries.sql.go @@ -139,8 +139,8 @@ func (q *Queries) InsertAddressLog(ctx context.Context, arg InsertAddressLogPara } const insertGatewayEnvelope = `-- name: InsertGatewayEnvelope :execrows -INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope) - VALUES ($1, $2, $3, $4) +INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope, block_number, block_hash, version, is_canonical) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING ` @@ -150,6 +150,10 @@ type InsertGatewayEnvelopeParams struct { OriginatorSequenceID int64 Topic []byte OriginatorEnvelope []byte + BlockNumber int64 + BlockHash []byte + Version int32 + IsCanonical bool } func (q *Queries) InsertGatewayEnvelope(ctx context.Context, arg InsertGatewayEnvelopeParams) (int64, error) { @@ -158,6 +162,10 @@ func (q *Queries) InsertGatewayEnvelope(ctx context.Context, arg InsertGatewayEn arg.OriginatorSequenceID, arg.Topic, arg.OriginatorEnvelope, + arg.BlockNumber, + arg.BlockHash, + arg.Version, + arg.IsCanonical, ) if err != nil { return 0, err @@ -235,7 +243,7 @@ func (q *Queries) RevokeAddressFromLog(ctx context.Context, arg RevokeAddressFro const selectGatewayEnvelopes = `-- name: SelectGatewayEnvelopes :many SELECT - gateway_time, originator_node_id, originator_sequence_id, topic, originator_envelope + gateway_time, originator_node_id, originator_sequence_id, topic, originator_envelope, block_number, block_hash, version, is_canonical FROM select_gateway_envelopes($1::INT[], $2::BIGINT[], $3::BYTEA[], $4::INT[], $5::INT) ` @@ -269,6 +277,10 @@ func (q *Queries) SelectGatewayEnvelopes(ctx context.Context, arg SelectGatewayE &i.OriginatorSequenceID, &i.Topic, &i.OriginatorEnvelope, + &i.BlockNumber, + &i.BlockHash, + &i.Version, + &i.IsCanonical, ); err != nil { return nil, err } diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 4f12a9cd..82f2dfa5 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -1,8 +1,11 @@ package indexer import ( + "bytes" "context" "database/sql" + "encoding/hex" + "math/big" "sync" "time" @@ -86,7 +89,9 @@ func (i *Indexer) StartIndexer( indexLogs( ctx, + client, streamer.messagesChannel, + streamer.messagesReorgChannel, indexingLogger, storer.NewGroupMessageStorer(querier, indexingLogger, messagesContract), streamer.messagesBlockTracker, @@ -107,7 +112,10 @@ func (i *Indexer) StartIndexer( With(zap.String("contractAddress", cfg.IdentityUpdatesContractAddress)) indexLogs( ctx, - streamer.identityUpdatesChannel, indexingLogger, + client, + streamer.identityUpdatesChannel, + streamer.identityUpdatesReorgChannel, + indexingLogger, storer.NewIdentityUpdateStorer( db, indexingLogger, @@ -125,7 +133,9 @@ func (i *Indexer) StartIndexer( type builtStreamer struct { streamer *blockchain.RpcLogStreamer messagesChannel <-chan types.Log + messagesReorgChannel chan<- uint64 identityUpdatesChannel <-chan types.Log + identityUpdatesReorgChannel chan<- uint64 identityUpdatesBlockTracker *BlockTracker messagesBlockTracker *BlockTracker } @@ -147,7 +157,7 @@ func configureLogStream( } latestBlockNumber, _ := messagesTracker.GetLatestBlock() - messagesChannel := builder.ListenForContractEvent( + messagesChannel, messagesReorgChannel := builder.ListenForContractEvent( latestBlockNumber, common.HexToAddress(cfg.MessagesContractAddress), []common.Hash{messagesTopic}, @@ -165,7 +175,7 @@ func configureLogStream( } latestBlockNumber, _ = identityUpdatesTracker.GetLatestBlock() - identityUpdatesChannel := builder.ListenForContractEvent( + identityUpdatesChannel, identityUpdatesReorgChannel := builder.ListenForContractEvent( latestBlockNumber, common.HexToAddress(cfg.IdentityUpdatesContractAddress), []common.Hash{identityUpdatesTopic}, @@ -180,7 +190,9 @@ func configureLogStream( return &builtStreamer{ streamer: streamer, messagesChannel: messagesChannel, + messagesReorgChannel: messagesReorgChannel, identityUpdatesChannel: identityUpdatesChannel, + identityUpdatesReorgChannel: identityUpdatesReorgChannel, identityUpdatesBlockTracker: identityUpdatesTracker, messagesBlockTracker: messagesTracker, }, nil @@ -195,20 +207,59 @@ The only non-retriable errors should be things like malformed events or failed v */ func indexLogs( ctx context.Context, + client *ethclient.Client, eventChannel <-chan types.Log, + reorgChannel chan<- uint64, logger *zap.Logger, logStorer storer.LogStorer, blockTracker IBlockTracker, ) { - var err storer.LogStorageError + var errStorage storer.LogStorageError // We don't need to listen for the ctx.Done() here, since the eventChannel will be closed when the parent context is canceled for event := range eventChannel { + storedBlockNumber, storedBlockHash := blockTracker.GetLatestBlock() + + // TODO: Calculate the blocks safe distance in the L3 or risk tolerance we assume + // idea - SafeBlockDistance uint64 `env:"SAFE_BLOCK_DISTANCE" envDefault:"100"` + if event.BlockNumber > storedBlockNumber && event.BlockNumber-storedBlockNumber < 100 { + latestBlockHash, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber))) + if err != nil { + logger.Error("error getting block", + zap.Uint64("blockNumber", storedBlockNumber), + zap.Error(err), + ) + + continue + } + + if !bytes.Equal(storedBlockHash, latestBlockHash.Hash().Bytes()) { + logger.Warn("blockchain reorg detected", + zap.Uint64("storedBlockNumber", storedBlockNumber), + zap.String("storedBlockHash", hex.EncodeToString(storedBlockHash)), + zap.String("onchainBlockHash", latestBlockHash.Hash().String()), + ) + + // TODO: Implement reorg handling: + // 1. Find the common ancestor block using historical gateway_envelopes block numbers and hashes + // 2. Verify current event.BlockNumber's parent hash matches expected chain + // 3. Handle the reorg from ancestor to the latest block: + // 3.1. Mark existing db logs after common ancestor with: + // - is_canonical: false + // 3.2. Use (is_canonical == true) as key for queries to detect the latest event + // 4. Send the block number to the reorg channel to trigger pulling the canonical blocks + // 4.1. New logs get marked with: + // - is_canonical: true + // - version = version + 1 + // reorgChannel <- event.BlockNumber + } + } + Retry: for { - err = logStorer.StoreLog(ctx, event) - if err != nil { - logger.Error("error storing log", zap.Error(err)) - if err.ShouldRetry() { + errStorage = logStorer.StoreLog(ctx, event) + if errStorage != nil { + logger.Error("error storing log", zap.Error(errStorage)) + if errStorage.ShouldRetry() { time.Sleep(100 * time.Millisecond) continue Retry } diff --git a/pkg/indexer/storer/groupMessage.go b/pkg/indexer/storer/groupMessage.go index 9c65a63b..afe24e09 100644 --- a/pkg/indexer/storer/groupMessage.go +++ b/pkg/indexer/storer/groupMessage.go @@ -78,6 +78,10 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS s.logger.Debug("Inserting message from contract", zap.String("topic", topicStruct.String())) if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{ + BlockNumber: int64(event.BlockNumber), + BlockHash: event.BlockHash.Bytes(), + Version: 1, // TODO: Make this dynamic + IsCanonical: true, // TODO: Make this dynamic // We may not want to hardcode this to 0 and have an originator ID for each smart contract? OriginatorNodeID: 0, OriginatorSequenceID: int64(msgSent.SequenceId), diff --git a/pkg/migrations/00004_update-gateway-envelopes.sql b/pkg/migrations/00004_update-gateway-envelopes.sql new file mode 100644 index 00000000..9f73f53f --- /dev/null +++ b/pkg/migrations/00004_update-gateway-envelopes.sql @@ -0,0 +1,6 @@ +ALTER TABLE gateway_envelopes + ADD COLUMN block_number BIGINT NOT NULL, + ADD COLUMN block_hash BYTEA, + ADD COLUMN version INT NOT NULL DEFAULT 1, + ADD COLUMN is_canonical BOOLEAN NOT NULL DEFAULT TRUE; + From a36a4b6455f62f2e6a461aac7fff54727b3a0746 Mon Sep 17 00:00:00 2001 From: Borja Aranda Date: Wed, 15 Jan 2025 14:25:20 +0100 Subject: [PATCH 2/6] handle chan closure correctly --- pkg/blockchain/rpcLogStreamer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/blockchain/rpcLogStreamer.go b/pkg/blockchain/rpcLogStreamer.go index c13466d1..ebb9d824 100644 --- a/pkg/blockchain/rpcLogStreamer.go +++ b/pkg/blockchain/rpcLogStreamer.go @@ -75,7 +75,7 @@ type contractConfig struct { contractAddress common.Address topics []common.Hash eventChannel chan<- types.Log - reorgChannel <-chan uint64 + reorgChannel chan uint64 maxDisconnectTime time.Duration } @@ -129,6 +129,7 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) { logger := r.logger.With(zap.String("contractAddress", watcher.contractAddress.Hex())) startTime := time.Now() defer close(watcher.eventChannel) + defer close(watcher.reorgChannel) for { select { case <-r.ctx.Done(): From 3603d02842eb8beab6824156d7db1fe878e339c5 Mon Sep 17 00:00:00 2001 From: Borja Aranda Date: Wed, 15 Jan 2025 21:02:40 +0100 Subject: [PATCH 3/6] block_hash cannot be null --- pkg/migrations/00004_update-gateway-envelopes.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/migrations/00004_update-gateway-envelopes.sql b/pkg/migrations/00004_update-gateway-envelopes.sql index 9f73f53f..41b6162f 100644 --- a/pkg/migrations/00004_update-gateway-envelopes.sql +++ b/pkg/migrations/00004_update-gateway-envelopes.sql @@ -1,6 +1,6 @@ ALTER TABLE gateway_envelopes ADD COLUMN block_number BIGINT NOT NULL, - ADD COLUMN block_hash BYTEA, + ADD COLUMN block_hash BYTEA NOT NULL, ADD COLUMN version INT NOT NULL DEFAULT 1, ADD COLUMN is_canonical BOOLEAN NOT NULL DEFAULT TRUE; From 7e6a14e0a1e7a6089b2523a6bc4e14ea2492c9fe Mon Sep 17 00:00:00 2001 From: Borja Aranda Date: Thu, 16 Jan 2025 14:50:08 +0100 Subject: [PATCH 4/6] update models, queries and storers --- pkg/blockchain/interface.go | 1 + pkg/config/options.go | 2 + pkg/db/queries.sql | 20 ++++++++ pkg/db/queries/models.go | 6 +-- pkg/db/queries/queries.sql.go | 50 +++++++++++++++++-- pkg/indexer/indexer.go | 18 ++++--- pkg/indexer/storer/groupMessage.go | 46 ++++++++++++++--- pkg/indexer/storer/identityUpdate.go | 36 ++++++++++++- pkg/indexer/storer/interface.go | 2 +- pkg/indexer/storer/logAppend.go | 49 ++++++++++++++++++ .../00004_add_blockchain_columns.down.sql | 12 +++++ .../00004_add_blockchain_columns.up.sql | 14 ++++++ .../00004_update-gateway-envelopes.sql | 6 --- 13 files changed, 233 insertions(+), 29 deletions(-) create mode 100644 pkg/indexer/storer/logAppend.go create mode 100644 pkg/migrations/00004_add_blockchain_columns.down.sql create mode 100644 pkg/migrations/00004_add_blockchain_columns.up.sql delete mode 100644 pkg/migrations/00004_update-gateway-envelopes.sql diff --git a/pkg/blockchain/interface.go b/pkg/blockchain/interface.go index 904e4f41..fc7e049e 100644 --- a/pkg/blockchain/interface.go +++ b/pkg/blockchain/interface.go @@ -30,6 +30,7 @@ type ChainClient interface { ethereum.BlockNumberReader ethereum.LogFilterer ethereum.ChainIDReader + ethereum.ChainReader } type TransactionSigner interface { diff --git a/pkg/config/options.go b/pkg/config/options.go index ea8988c1..9c7cebd7 100644 --- a/pkg/config/options.go +++ b/pkg/config/options.go @@ -16,6 +16,8 @@ type ContractsOptions struct { ChainID int `long:"chain-id" env:"XMTPD_CONTRACTS_CHAIN_ID" description:"Chain ID for the appchain" default:"31337"` RefreshInterval time.Duration `long:"refresh-interval" env:"XMTPD_CONTRACTS_REFRESH_INTERVAL" description:"Refresh interval for the nodes registry" default:"60s"` MaxChainDisconnectTime time.Duration `long:"max-chain-disconnect-time" env:"XMTPD_CONTRACTS_MAX_CHAIN_DISCONNECT_TIME" description:"Maximum time to allow the node to operate while disconnected" default:"300s"` + // TODO: Calculate the blocks safe distance in the L3 or risk tolerance we assume + SafeBlockDistance uint64 `long:"safe-block-distance" env:"XMTPD_CONTRACTS_SAFE_BLOCK_DISTANCE" description:"Safe block distance" default:"0"` } type DbOptions struct { diff --git a/pkg/db/queries.sql b/pkg/db/queries.sql index c1a4d684..9af95fec 100644 --- a/pkg/db/queries.sql +++ b/pkg/db/queries.sql @@ -122,3 +122,23 @@ FROM WHERE contract_address = @contract_address; +-- name: GetEnvelopeVersion :one +SELECT + version +FROM + gateway_envelopes +WHERE + originator_node_id = $1 + AND originator_sequence_id = $2 + AND is_canonical = TRUE; + +-- name: InvalidateEnvelope :exec +UPDATE + gateway_envelopes +SET + is_canonical = FALSE +WHERE + originator_node_id = $1 + AND originator_sequence_id = $2 + AND is_canonical = TRUE; + diff --git a/pkg/db/queries/models.go b/pkg/db/queries/models.go index 06f422cc..af3a6094 100644 --- a/pkg/db/queries/models.go +++ b/pkg/db/queries/models.go @@ -22,10 +22,10 @@ type GatewayEnvelope struct { OriginatorSequenceID int64 Topic []byte OriginatorEnvelope []byte - BlockNumber int64 + BlockNumber sql.NullInt64 BlockHash []byte - Version int32 - IsCanonical bool + Version sql.NullInt32 + IsCanonical sql.NullBool } type LatestBlock struct { diff --git a/pkg/db/queries/queries.sql.go b/pkg/db/queries/queries.sql.go index 018c3154..95e3ea67 100644 --- a/pkg/db/queries/queries.sql.go +++ b/pkg/db/queries/queries.sql.go @@ -75,6 +75,29 @@ func (q *Queries) GetAddressLogs(ctx context.Context, addresses []string) ([]Get return items, nil } +const getEnvelopeVersion = `-- name: GetEnvelopeVersion :one +SELECT + version +FROM + gateway_envelopes +WHERE + originator_node_id = $1 + AND originator_sequence_id = $2 + AND is_canonical = TRUE +` + +type GetEnvelopeVersionParams struct { + OriginatorNodeID int32 + OriginatorSequenceID int64 +} + +func (q *Queries) GetEnvelopeVersion(ctx context.Context, arg GetEnvelopeVersionParams) (sql.NullInt32, error) { + row := q.db.QueryRowContext(ctx, getEnvelopeVersion, arg.OriginatorNodeID, arg.OriginatorSequenceID) + var version sql.NullInt32 + err := row.Scan(&version) + return version, err +} + const getLatestBlock = `-- name: GetLatestBlock :one SELECT block_number, @@ -150,10 +173,10 @@ type InsertGatewayEnvelopeParams struct { OriginatorSequenceID int64 Topic []byte OriginatorEnvelope []byte - BlockNumber int64 + BlockNumber sql.NullInt64 BlockHash []byte - Version int32 - IsCanonical bool + Version sql.NullInt32 + IsCanonical sql.NullBool } func (q *Queries) InsertGatewayEnvelope(ctx context.Context, arg InsertGatewayEnvelopeParams) (int64, error) { @@ -217,6 +240,27 @@ func (q *Queries) InsertStagedOriginatorEnvelope(ctx context.Context, arg Insert return i, err } +const invalidateEnvelope = `-- name: InvalidateEnvelope :exec +UPDATE + gateway_envelopes +SET + is_canonical = FALSE +WHERE + originator_node_id = $1 + AND originator_sequence_id = $2 + AND is_canonical = TRUE +` + +type InvalidateEnvelopeParams struct { + OriginatorNodeID int32 + OriginatorSequenceID int64 +} + +func (q *Queries) InvalidateEnvelope(ctx context.Context, arg InvalidateEnvelopeParams) error { + _, err := q.db.ExecContext(ctx, invalidateEnvelope, arg.OriginatorNodeID, arg.OriginatorSequenceID) + return err +} + const revokeAddressFromLog = `-- name: RevokeAddressFromLog :execrows UPDATE address_log diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 82f2dfa5..877f4f3a 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -92,6 +92,7 @@ func (i *Indexer) StartIndexer( client, streamer.messagesChannel, streamer.messagesReorgChannel, + cfg.SafeBlockDistance, indexingLogger, storer.NewGroupMessageStorer(querier, indexingLogger, messagesContract), streamer.messagesBlockTracker, @@ -115,6 +116,7 @@ func (i *Indexer) StartIndexer( client, streamer.identityUpdatesChannel, streamer.identityUpdatesReorgChannel, + cfg.SafeBlockDistance, indexingLogger, storer.NewIdentityUpdateStorer( db, @@ -207,22 +209,24 @@ The only non-retriable errors should be things like malformed events or failed v */ func indexLogs( ctx context.Context, - client *ethclient.Client, + client blockchain.ChainClient, eventChannel <-chan types.Log, reorgChannel chan<- uint64, + safeBlockDistance uint64, logger *zap.Logger, logStorer storer.LogStorer, blockTracker IBlockTracker, ) { var errStorage storer.LogStorageError + // We don't need to listen for the ctx.Done() here, since the eventChannel will be closed when the parent context is canceled for event := range eventChannel { storedBlockNumber, storedBlockHash := blockTracker.GetLatestBlock() // TODO: Calculate the blocks safe distance in the L3 or risk tolerance we assume - // idea - SafeBlockDistance uint64 `env:"SAFE_BLOCK_DISTANCE" envDefault:"100"` - if event.BlockNumber > storedBlockNumber && event.BlockNumber-storedBlockNumber < 100 { - latestBlockHash, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber))) + if event.BlockNumber > storedBlockNumber && + event.BlockNumber-storedBlockNumber < safeBlockDistance { + latestBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber))) if err != nil { logger.Error("error getting block", zap.Uint64("blockNumber", storedBlockNumber), @@ -232,11 +236,11 @@ func indexLogs( continue } - if !bytes.Equal(storedBlockHash, latestBlockHash.Hash().Bytes()) { + if !bytes.Equal(storedBlockHash, latestBlock.Hash().Bytes()) { logger.Warn("blockchain reorg detected", zap.Uint64("storedBlockNumber", storedBlockNumber), zap.String("storedBlockHash", hex.EncodeToString(storedBlockHash)), - zap.String("onchainBlockHash", latestBlockHash.Hash().String()), + zap.String("onchainBlockHash", latestBlock.Hash().String()), ) // TODO: Implement reorg handling: @@ -256,7 +260,7 @@ func indexLogs( Retry: for { - errStorage = logStorer.StoreLog(ctx, event) + errStorage = logStorer.StoreLog(ctx, event, false) if errStorage != nil { logger.Error("error storing log", zap.Error(errStorage)) if errStorage.ShouldRetry() { diff --git a/pkg/indexer/storer/groupMessage.go b/pkg/indexer/storer/groupMessage.go index afe24e09..a1613ed1 100644 --- a/pkg/indexer/storer/groupMessage.go +++ b/pkg/indexer/storer/groupMessage.go @@ -2,6 +2,7 @@ package storer import ( "context" + "database/sql" "errors" "github.com/ethereum/go-ethereum/core/types" @@ -13,6 +14,11 @@ import ( "google.golang.org/protobuf/proto" ) +const ( + // We may not want to hardcode this to 0 and have an originator ID for each smart contract? + GROUP_MESSAGE_ORIGINATOR_ID = 0 +) + type GroupMessageStorer struct { contract *groupmessages.GroupMessages queries *queries.Queries @@ -32,7 +38,11 @@ func NewGroupMessageStorer( } // Validate and store a group message log event -func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError { +func (s *GroupMessageStorer) StoreLog( + ctx context.Context, + event types.Log, + appendLog bool, +) LogStorageError { msgSent, err := s.contract.ParseMessageSent(event) if err != nil { return NewLogStorageError(err, false) @@ -75,15 +85,37 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS return NewLogStorageError(err, false) } + version := sql.NullInt32{Int32: 1, Valid: true} + + if appendLog { + version, err = GetVersionForAppend( + ctx, + s.queries, + s.logger, + GROUP_MESSAGE_ORIGINATOR_ID, + int64(msgSent.SequenceId), + ) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return NewLogStorageError(err, true) + } + if errors.Is(err, sql.ErrNoRows) { + s.logger.Debug("No rows found for envelope, inserting new", + zap.Int("originator_node_id", GROUP_MESSAGE_ORIGINATOR_ID), + zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)), + ) + } + } + } + s.logger.Debug("Inserting message from contract", zap.String("topic", topicStruct.String())) if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{ - BlockNumber: int64(event.BlockNumber), - BlockHash: event.BlockHash.Bytes(), - Version: 1, // TODO: Make this dynamic - IsCanonical: true, // TODO: Make this dynamic - // We may not want to hardcode this to 0 and have an originator ID for each smart contract? - OriginatorNodeID: 0, + BlockNumber: sql.NullInt64{Int64: int64(event.BlockNumber), Valid: true}, + BlockHash: event.BlockHash.Bytes(), + Version: version, + IsCanonical: sql.NullBool{Bool: true, Valid: true}, + OriginatorNodeID: GROUP_MESSAGE_ORIGINATOR_ID, OriginatorSequenceID: int64(msgSent.SequenceId), Topic: topicStruct.Bytes(), OriginatorEnvelope: originatorEnvelopeBytes, diff --git a/pkg/indexer/storer/identityUpdate.go b/pkg/indexer/storer/identityUpdate.go index 737923fc..daf29fdf 100644 --- a/pkg/indexer/storer/identityUpdate.go +++ b/pkg/indexer/storer/identityUpdate.go @@ -3,6 +3,7 @@ package storer import ( "context" "database/sql" + "errors" "fmt" "time" @@ -23,6 +24,7 @@ import ( ) const ( + // We may not want to hardcode this to 1 and have an originator ID for each smart contract? IDENTITY_UPDATE_ORIGINATOR_ID = 1 ) @@ -48,7 +50,11 @@ func NewIdentityUpdateStorer( } // Validate and store an identity update log event -func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError { +func (s *IdentityUpdateStorer) StoreLog( + ctx context.Context, + event types.Log, + appendLog bool, +) LogStorageError { msgSent, err := s.contract.ParseIdentityUpdateCreated(event) if err != nil { return NewLogStorageError(err, false) @@ -166,8 +172,34 @@ func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) Lo return NewLogStorageError(err, true) } + version := sql.NullInt32{Int32: 1, Valid: true} + + if appendLog { + version, err = GetVersionForAppend( + ctx, + querier, + s.logger, + IDENTITY_UPDATE_ORIGINATOR_ID, + int64(msgSent.SequenceId), + ) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return NewLogStorageError(err, true) + } + if errors.Is(err, sql.ErrNoRows) { + s.logger.Debug("No rows found for envelope, inserting new", + zap.Int("originator_node_id", IDENTITY_UPDATE_ORIGINATOR_ID), + zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)), + ) + } + } + } + if _, err = querier.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{ - // We may not want to hardcode this to 1 and have an originator ID for each smart contract? + BlockNumber: sql.NullInt64{Int64: int64(event.BlockNumber), Valid: true}, + BlockHash: event.BlockHash.Bytes(), + Version: version, + IsCanonical: sql.NullBool{Bool: true, Valid: true}, OriginatorNodeID: IDENTITY_UPDATE_ORIGINATOR_ID, OriginatorSequenceID: int64(msgSent.SequenceId), Topic: messageTopic.Bytes(), diff --git a/pkg/indexer/storer/interface.go b/pkg/indexer/storer/interface.go index 71a1651e..aedb569e 100644 --- a/pkg/indexer/storer/interface.go +++ b/pkg/indexer/storer/interface.go @@ -8,5 +8,5 @@ import ( // Takes a log event and stores it, returning either an error that may be retriable, non-retriable, or nil type LogStorer interface { - StoreLog(ctx context.Context, event types.Log) LogStorageError + StoreLog(ctx context.Context, event types.Log, appendLog bool) LogStorageError } diff --git a/pkg/indexer/storer/logAppend.go b/pkg/indexer/storer/logAppend.go new file mode 100644 index 00000000..73118913 --- /dev/null +++ b/pkg/indexer/storer/logAppend.go @@ -0,0 +1,49 @@ +package storer + +import ( + "context" + "database/sql" + "errors" + + "github.com/xmtp/xmtpd/pkg/db/queries" + "go.uber.org/zap" +) + +func GetVersionForAppend( + ctx context.Context, + querier *queries.Queries, + logger *zap.Logger, + originatorNodeID int32, + sequenceID int64, +) (sql.NullInt32, error) { + var version sql.NullInt32 + currentVersion, err := querier.GetEnvelopeVersion(ctx, queries.GetEnvelopeVersionParams{ + OriginatorNodeID: originatorNodeID, + OriginatorSequenceID: sequenceID, + }) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + logger.Error("Error getting current version", zap.Error(err)) + return version, err + } + + if errors.Is(err, sql.ErrNoRows) { + return version, err + } + + if err == nil { + if err = querier.InvalidateEnvelope(ctx, queries.InvalidateEnvelopeParams{ + OriginatorNodeID: originatorNodeID, + OriginatorSequenceID: sequenceID, + }); err != nil { + logger.Error("Error invalidating old envelope", zap.Error(err)) + return version, err + } + + version = sql.NullInt32{ + Int32: currentVersion.Int32 + 1, + Valid: true, + } + } + + return version, nil +} diff --git a/pkg/migrations/00004_add_blockchain_columns.down.sql b/pkg/migrations/00004_add_blockchain_columns.down.sql new file mode 100644 index 00000000..d395e683 --- /dev/null +++ b/pkg/migrations/00004_add_blockchain_columns.down.sql @@ -0,0 +1,12 @@ +-- Drop everything in reverse order +DROP INDEX IF EXISTS idx_gateway_envelopes_reorg; + +ALTER TABLE gateway_envelopes + DROP CONSTRAINT IF EXISTS blockchain_message_constraint; + +ALTER TABLE gateway_envelopes + DROP COLUMN IF EXISTS block_number, + DROP COLUMN IF EXISTS block_hash, + DROP COLUMN IF EXISTS version, + DROP COLUMN IF EXISTS is_canonical; + diff --git a/pkg/migrations/00004_add_blockchain_columns.up.sql b/pkg/migrations/00004_add_blockchain_columns.up.sql new file mode 100644 index 00000000..da18a2ee --- /dev/null +++ b/pkg/migrations/00004_add_blockchain_columns.up.sql @@ -0,0 +1,14 @@ +-- Add blockchain-related columns and constraint +ALTER TABLE gateway_envelopes + ADD COLUMN block_number BIGINT, + ADD COLUMN block_hash BYTEA, + ADD COLUMN version INT, + ADD COLUMN is_canonical BOOLEAN; + +ALTER TABLE gateway_envelopes + ADD CONSTRAINT blockchain_message_constraint CHECK ((block_number IS NULL AND block_hash IS NULL AND version IS NULL AND is_canonical IS NULL) OR (block_number IS NOT NULL AND block_hash IS NOT NULL AND version IS NOT NULL AND is_canonical IS NOT NULL)); + +CREATE INDEX idx_gateway_envelopes_reorg ON gateway_envelopes(block_number DESC, block_hash) +WHERE + block_number IS NOT NULL AND is_canonical = TRUE; + diff --git a/pkg/migrations/00004_update-gateway-envelopes.sql b/pkg/migrations/00004_update-gateway-envelopes.sql deleted file mode 100644 index 41b6162f..00000000 --- a/pkg/migrations/00004_update-gateway-envelopes.sql +++ /dev/null @@ -1,6 +0,0 @@ -ALTER TABLE gateway_envelopes - ADD COLUMN block_number BIGINT NOT NULL, - ADD COLUMN block_hash BYTEA NOT NULL, - ADD COLUMN version INT NOT NULL DEFAULT 1, - ADD COLUMN is_canonical BOOLEAN NOT NULL DEFAULT TRUE; - From 7f8e8464bfd4fdb58e40f34ff4cf340bcae1d9c8 Mon Sep 17 00:00:00 2001 From: Borja Aranda Date: Thu, 16 Jan 2025 16:08:34 +0100 Subject: [PATCH 5/6] fix test --- pkg/blockchain/rpcLogStreamer.go | 4 + pkg/blockchain/rpcLogStreamer_test.go | 2 +- pkg/indexer/indexer.go | 4 +- pkg/indexer/indexer_test.go | 91 +++- pkg/indexer/storer/groupMessage_test.go | 5 +- pkg/indexer/storer/identityUpdate_test.go | 1 + pkg/interceptors/server/auth_test.go | 8 +- pkg/mocks/authn/mock_JWTVerifier.go | 24 +- pkg/mocks/blockchain/mock_ChainClient.go | 414 ++++++++++++++++++ pkg/mocks/storer/mock_LogStorer.go | 21 +- pkg/proto/identity/api/v1/identity.pb.go | 2 +- .../identity/associations/association.pb.go | 2 +- .../identity/associations/signature.pb.go | 2 +- pkg/proto/identity/credential.pb.go | 2 +- pkg/proto/keystore_api/v1/keystore.pb.go | 2 +- pkg/proto/message_api/v1/authn.pb.go | 2 +- pkg/proto/message_api/v1/message_api.pb.go | 2 +- pkg/proto/message_contents/ciphertext.pb.go | 2 +- pkg/proto/message_contents/composite.pb.go | 2 +- pkg/proto/message_contents/contact.pb.go | 2 +- pkg/proto/message_contents/content.pb.go | 2 +- .../conversation_reference.pb.go | 2 +- pkg/proto/message_contents/ecies.pb.go | 2 +- pkg/proto/message_contents/frames.pb.go | 2 +- pkg/proto/message_contents/invitation.pb.go | 2 +- pkg/proto/message_contents/message.pb.go | 2 +- pkg/proto/message_contents/private_key.pb.go | 2 +- .../private_preferences.pb.go | 2 +- pkg/proto/message_contents/public_key.pb.go | 2 +- pkg/proto/message_contents/signature.pb.go | 2 +- .../message_contents/signed_payload.pb.go | 2 +- pkg/proto/mls/api/v1/mls.pb.go | 2 +- pkg/proto/mls/database/intents.pb.go | 2 +- pkg/proto/mls/message_contents/content.pb.go | 2 +- .../content_types/reaction.pb.go | 2 +- .../message_contents/group_membership.pb.go | 2 +- .../mls/message_contents/group_metadata.pb.go | 2 +- .../group_mutable_metadata.pb.go | 2 +- .../message_contents/group_permissions.pb.go | 2 +- .../transcript_messages.pb.go | 2 +- pkg/proto/mls_validation/v1/service.pb.go | 2 +- pkg/proto/xmtpv4/envelopes/envelopes.pb.go | 2 +- .../xmtpv4/message_api/message_api.pb.go | 2 +- .../xmtpv4/message_api/misbehavior_api.pb.go | 2 +- pkg/proto/xmtpv4/payer_api/payer_api.pb.go | 2 +- 45 files changed, 568 insertions(+), 76 deletions(-) diff --git a/pkg/blockchain/rpcLogStreamer.go b/pkg/blockchain/rpcLogStreamer.go index ebb9d824..73c435a7 100644 --- a/pkg/blockchain/rpcLogStreamer.go +++ b/pkg/blockchain/rpcLogStreamer.go @@ -228,6 +228,10 @@ func (r *RpcLogStreamer) getNextPage( return logs, &nextBlockNumber, nil } +func (r *RpcLogStreamer) Client() ChainClient { + return r.client +} + func buildFilterQuery( contractConfig contractConfig, fromBlock int64, diff --git a/pkg/blockchain/rpcLogStreamer_test.go b/pkg/blockchain/rpcLogStreamer_test.go index 24b9c947..2489d2f0 100644 --- a/pkg/blockchain/rpcLogStreamer_test.go +++ b/pkg/blockchain/rpcLogStreamer_test.go @@ -41,7 +41,7 @@ func TestBuilder(t *testing.T) { require.NoError(t, err) builder := NewRpcLogStreamBuilder(context.Background(), testclient, testutils.NewLog(t)) - listenerChannel := builder.ListenForContractEvent( + listenerChannel, _ := builder.ListenForContractEvent( 1, testutils.RandomAddress(), []common.Hash{testutils.RandomLogTopic()}, 5*time.Minute, diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 877f4f3a..3ae811c7 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -89,7 +89,7 @@ func (i *Indexer) StartIndexer( indexLogs( ctx, - client, + streamer.streamer.Client(), streamer.messagesChannel, streamer.messagesReorgChannel, cfg.SafeBlockDistance, @@ -113,7 +113,7 @@ func (i *Indexer) StartIndexer( With(zap.String("contractAddress", cfg.IdentityUpdatesContractAddress)) indexLogs( ctx, - client, + streamer.streamer.Client(), streamer.identityUpdatesChannel, streamer.identityUpdatesReorgChannel, cfg.SafeBlockDistance, diff --git a/pkg/indexer/indexer_test.go b/pkg/indexer/indexer_test.go index 753862c8..77627786 100644 --- a/pkg/indexer/indexer_test.go +++ b/pkg/indexer/indexer_test.go @@ -10,59 +10,118 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/mock" "github.com/xmtp/xmtpd/pkg/indexer/storer" + blockchainMocks "github.com/xmtp/xmtpd/pkg/mocks/blockchain" indexerMocks "github.com/xmtp/xmtpd/pkg/mocks/indexer" storerMocks "github.com/xmtp/xmtpd/pkg/mocks/storer" "github.com/xmtp/xmtpd/pkg/testutils" ) +const testSafeBlockDistance = uint64(10) + func TestIndexLogsSuccess(t *testing.T) { channel := make(chan types.Log, 10) - defer close(channel) + reorgChannel := make(chan uint64, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + close(channel) + close(reorgChannel) + }() + newBlockNumber := uint64(10) newBlockHash := common.HexToHash( "0x0000000000000000000000000000000000000000000000000000000000000000", ) - logStorer := storerMocks.NewMockLogStorer(t) - blockTracker := indexerMocks.NewMockIBlockTracker(t) - blockTracker.EXPECT(). - UpdateLatestBlock(mock.Anything, newBlockNumber, newBlockHash.Bytes()). - Return(nil) - event := types.Log{ Address: common.HexToAddress("0x123"), BlockNumber: newBlockNumber, + BlockHash: newBlockHash, } - logStorer.EXPECT().StoreLog(mock.Anything, event).Times(1).Return(nil) + channel <- event - go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker) + mockClient := blockchainMocks.NewMockChainClient(t) + + blockTracker := indexerMocks.NewMockIBlockTracker(t) + blockTracker.EXPECT(). + UpdateLatestBlock(mock.Anything, newBlockNumber, newBlockHash.Bytes()). + Return(nil) + blockTracker.EXPECT(). + GetLatestBlock(). + Return(newBlockNumber, newBlockHash.Bytes()) + + logStorer := storerMocks.NewMockLogStorer(t) + logStorer.EXPECT(). + StoreLog(mock.Anything, event, false). + Return(nil) + + go indexLogs( + ctx, + mockClient, + channel, + reorgChannel, + testSafeBlockDistance, + testutils.NewLog(t), + logStorer, + blockTracker, + ) + time.Sleep(100 * time.Millisecond) } func TestIndexLogsRetryableError(t *testing.T) { channel := make(chan types.Log, 10) - defer close(channel) + reorgChannel := make(chan uint64, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + close(channel) + close(reorgChannel) + }() - logStorer := storerMocks.NewMockLogStorer(t) - blockTracker := indexerMocks.NewMockIBlockTracker(t) + newBlockNumber := uint64(10) + newBlockHash := common.HexToHash( + "0x0000000000000000000000000000000000000000000000000000000000000000", + ) event := types.Log{ - Address: common.HexToAddress("0x123"), + Address: common.HexToAddress("0x123"), + BlockNumber: newBlockNumber, + BlockHash: newBlockHash, } + mockClient := blockchainMocks.NewMockChainClient(t) + logStorer := storerMocks.NewMockLogStorer(t) + + blockTracker := indexerMocks.NewMockIBlockTracker(t) + blockTracker.EXPECT(). + GetLatestBlock(). + Return(newBlockNumber, newBlockHash.Bytes()) + // Will fail for the first call with a retryable error and a non-retryable error on the second call attemptNumber := 0 logStorer.EXPECT(). - StoreLog(mock.Anything, event). - RunAndReturn(func(ctx context.Context, log types.Log) storer.LogStorageError { + StoreLog(mock.Anything, event, false). + RunAndReturn(func(ctx context.Context, log types.Log, isCanonical bool) storer.LogStorageError { attemptNumber++ return storer.NewLogStorageError(errors.New("retryable error"), attemptNumber < 2) }) + channel <- event - go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker) + go indexLogs( + ctx, + mockClient, + channel, + reorgChannel, + testSafeBlockDistance, + testutils.NewLog(t), + logStorer, + blockTracker, + ) + time.Sleep(200 * time.Millisecond) logStorer.AssertNumberOfCalls(t, "StoreLog", 2) diff --git a/pkg/indexer/storer/groupMessage_test.go b/pkg/indexer/storer/groupMessage_test.go index 6e82ef6a..03b8a99c 100644 --- a/pkg/indexer/storer/groupMessage_test.go +++ b/pkg/indexer/storer/groupMessage_test.go @@ -56,6 +56,7 @@ func TestStoreGroupMessages(t *testing.T) { err = storer.StoreLog( ctx, logMessage, + false, ) require.NoError(t, err) @@ -99,12 +100,14 @@ func TestStoreGroupMessageDuplicate(t *testing.T) { err := storer.StoreLog( ctx, logMessage, + false, ) require.NoError(t, err) // Store the log a second time err = storer.StoreLog( ctx, logMessage, + false, ) require.NoError(t, err) @@ -133,7 +136,7 @@ func TestStoreGroupMessageMalformed(t *testing.T) { Data: []byte("foo"), } - storageErr := storer.StoreLog(ctx, logMessage) + storageErr := storer.StoreLog(ctx, logMessage, false) require.Error(t, storageErr) require.False(t, storageErr.ShouldRetry()) } diff --git a/pkg/indexer/storer/identityUpdate_test.go b/pkg/indexer/storer/identityUpdate_test.go index 5426912c..9e519b74 100644 --- a/pkg/indexer/storer/identityUpdate_test.go +++ b/pkg/indexer/storer/identityUpdate_test.go @@ -79,6 +79,7 @@ func TestStoreIdentityUpdate(t *testing.T) { err := storer.StoreLog( ctx, logMessage, + false, ) require.NoError(t, err) diff --git a/pkg/interceptors/server/auth_test.go b/pkg/interceptors/server/auth_test.go index 5773d1e0..01d890f4 100644 --- a/pkg/interceptors/server/auth_test.go +++ b/pkg/interceptors/server/auth_test.go @@ -36,7 +36,7 @@ func TestUnaryInterceptor(t *testing.T) { return metadata.NewIncomingContext(context.Background(), md) }, setupVerifier: func() { - mockVerifier.EXPECT().Verify("valid_token").Return(nil) + mockVerifier.EXPECT().Verify("valid_token").Return(uint32(0), nil) }, wantError: nil, wantVerifiedNode: true, @@ -71,7 +71,7 @@ func TestUnaryInterceptor(t *testing.T) { setupVerifier: func() { mockVerifier.EXPECT(). Verify("invalid_token"). - Return(errors.New("invalid signature")) + Return(uint32(0), errors.New("invalid signature")) }, wantError: status.Error( codes.Unauthenticated, @@ -131,7 +131,7 @@ func TestStreamInterceptor(t *testing.T) { return metadata.NewIncomingContext(context.Background(), md) }, setupVerifier: func() { - mockVerifier.EXPECT().Verify("valid_token").Return(nil) + mockVerifier.EXPECT().Verify("valid_token").Return(uint32(0), nil) }, wantError: nil, wantVerifiedNode: true, @@ -156,7 +156,7 @@ func TestStreamInterceptor(t *testing.T) { setupVerifier: func() { mockVerifier.EXPECT(). Verify("invalid_token"). - Return(errors.New("invalid signature")) + Return(uint32(0), errors.New("invalid signature")) }, wantError: status.Error( codes.Unauthenticated, diff --git a/pkg/mocks/authn/mock_JWTVerifier.go b/pkg/mocks/authn/mock_JWTVerifier.go index c1f35df3..a4c5d572 100644 --- a/pkg/mocks/authn/mock_JWTVerifier.go +++ b/pkg/mocks/authn/mock_JWTVerifier.go @@ -25,14 +25,24 @@ func (_m *MockJWTVerifier) Verify(tokenString string) (uint32, error) { panic("no return value specified for Verify") } - var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { + var r0 uint32 + var r1 error + if rf, ok := ret.Get(0).(func(string) (uint32, error)); ok { + return rf(tokenString) + } + if rf, ok := ret.Get(0).(func(string) uint32); ok { r0 = rf(tokenString) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(uint32) + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(tokenString) + } else { + r1 = ret.Error(1) } - return 0, r0 + return r0, r1 } // MockJWTVerifier_Verify_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Verify' @@ -53,12 +63,12 @@ func (_c *MockJWTVerifier_Verify_Call) Run(run func(tokenString string)) *MockJW return _c } -func (_c *MockJWTVerifier_Verify_Call) Return(_a0 error) *MockJWTVerifier_Verify_Call { - _c.Call.Return(_a0) +func (_c *MockJWTVerifier_Verify_Call) Return(_a0 uint32, _a1 error) *MockJWTVerifier_Verify_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *MockJWTVerifier_Verify_Call) RunAndReturn(run func(string) error) *MockJWTVerifier_Verify_Call { +func (_c *MockJWTVerifier_Verify_Call) RunAndReturn(run func(string) (uint32, error)) *MockJWTVerifier_Verify_Call { _c.Call.Return(run) return _c } diff --git a/pkg/mocks/blockchain/mock_ChainClient.go b/pkg/mocks/blockchain/mock_ChainClient.go index b917221e..99a96fbd 100644 --- a/pkg/mocks/blockchain/mock_ChainClient.go +++ b/pkg/mocks/blockchain/mock_ChainClient.go @@ -5,6 +5,8 @@ package blockchain import ( big "math/big" + common "github.com/ethereum/go-ethereum/common" + context "context" ethereum "github.com/ethereum/go-ethereum" @@ -27,6 +29,124 @@ func (_m *MockChainClient) EXPECT() *MockChainClient_Expecter { return &MockChainClient_Expecter{mock: &_m.Mock} } +// BlockByHash provides a mock function with given fields: ctx, hash +func (_m *MockChainClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { + ret := _m.Called(ctx, hash) + + if len(ret) == 0 { + panic("no return value specified for BlockByHash") + } + + var r0 *types.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Block, error)); ok { + return rf(ctx, hash) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.Block); ok { + r0 = rf(ctx, hash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, hash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_BlockByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BlockByHash' +type MockChainClient_BlockByHash_Call struct { + *mock.Call +} + +// BlockByHash is a helper method to define mock.On call +// - ctx context.Context +// - hash common.Hash +func (_e *MockChainClient_Expecter) BlockByHash(ctx interface{}, hash interface{}) *MockChainClient_BlockByHash_Call { + return &MockChainClient_BlockByHash_Call{Call: _e.mock.On("BlockByHash", ctx, hash)} +} + +func (_c *MockChainClient_BlockByHash_Call) Run(run func(ctx context.Context, hash common.Hash)) *MockChainClient_BlockByHash_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *MockChainClient_BlockByHash_Call) Return(_a0 *types.Block, _a1 error) *MockChainClient_BlockByHash_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_BlockByHash_Call) RunAndReturn(run func(context.Context, common.Hash) (*types.Block, error)) *MockChainClient_BlockByHash_Call { + _c.Call.Return(run) + return _c +} + +// BlockByNumber provides a mock function with given fields: ctx, number +func (_m *MockChainClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + ret := _m.Called(ctx, number) + + if len(ret) == 0 { + panic("no return value specified for BlockByNumber") + } + + var r0 *types.Block + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Block, error)); ok { + return rf(ctx, number) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Block); ok { + r0 = rf(ctx, number) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Block) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, number) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_BlockByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BlockByNumber' +type MockChainClient_BlockByNumber_Call struct { + *mock.Call +} + +// BlockByNumber is a helper method to define mock.On call +// - ctx context.Context +// - number *big.Int +func (_e *MockChainClient_Expecter) BlockByNumber(ctx interface{}, number interface{}) *MockChainClient_BlockByNumber_Call { + return &MockChainClient_BlockByNumber_Call{Call: _e.mock.On("BlockByNumber", ctx, number)} +} + +func (_c *MockChainClient_BlockByNumber_Call) Run(run func(ctx context.Context, number *big.Int)) *MockChainClient_BlockByNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockChainClient_BlockByNumber_Call) Return(_a0 *types.Block, _a1 error) *MockChainClient_BlockByNumber_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_BlockByNumber_Call) RunAndReturn(run func(context.Context, *big.Int) (*types.Block, error)) *MockChainClient_BlockByNumber_Call { + _c.Call.Return(run) + return _c +} + // BlockNumber provides a mock function with given fields: ctx func (_m *MockChainClient) BlockNumber(ctx context.Context) (uint64, error) { ret := _m.Called(ctx) @@ -200,6 +320,124 @@ func (_c *MockChainClient_FilterLogs_Call) RunAndReturn(run func(context.Context return _c } +// HeaderByHash provides a mock function with given fields: ctx, hash +func (_m *MockChainClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + ret := _m.Called(ctx, hash) + + if len(ret) == 0 { + panic("no return value specified for HeaderByHash") + } + + var r0 *types.Header + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Header, error)); ok { + return rf(ctx, hash) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.Header); ok { + r0 = rf(ctx, hash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, hash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_HeaderByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HeaderByHash' +type MockChainClient_HeaderByHash_Call struct { + *mock.Call +} + +// HeaderByHash is a helper method to define mock.On call +// - ctx context.Context +// - hash common.Hash +func (_e *MockChainClient_Expecter) HeaderByHash(ctx interface{}, hash interface{}) *MockChainClient_HeaderByHash_Call { + return &MockChainClient_HeaderByHash_Call{Call: _e.mock.On("HeaderByHash", ctx, hash)} +} + +func (_c *MockChainClient_HeaderByHash_Call) Run(run func(ctx context.Context, hash common.Hash)) *MockChainClient_HeaderByHash_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *MockChainClient_HeaderByHash_Call) Return(_a0 *types.Header, _a1 error) *MockChainClient_HeaderByHash_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_HeaderByHash_Call) RunAndReturn(run func(context.Context, common.Hash) (*types.Header, error)) *MockChainClient_HeaderByHash_Call { + _c.Call.Return(run) + return _c +} + +// HeaderByNumber provides a mock function with given fields: ctx, number +func (_m *MockChainClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + ret := _m.Called(ctx, number) + + if len(ret) == 0 { + panic("no return value specified for HeaderByNumber") + } + + var r0 *types.Header + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { + return rf(ctx, number) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Header); ok { + r0 = rf(ctx, number) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, number) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_HeaderByNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HeaderByNumber' +type MockChainClient_HeaderByNumber_Call struct { + *mock.Call +} + +// HeaderByNumber is a helper method to define mock.On call +// - ctx context.Context +// - number *big.Int +func (_e *MockChainClient_Expecter) HeaderByNumber(ctx interface{}, number interface{}) *MockChainClient_HeaderByNumber_Call { + return &MockChainClient_HeaderByNumber_Call{Call: _e.mock.On("HeaderByNumber", ctx, number)} +} + +func (_c *MockChainClient_HeaderByNumber_Call) Run(run func(ctx context.Context, number *big.Int)) *MockChainClient_HeaderByNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockChainClient_HeaderByNumber_Call) Return(_a0 *types.Header, _a1 error) *MockChainClient_HeaderByNumber_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_HeaderByNumber_Call) RunAndReturn(run func(context.Context, *big.Int) (*types.Header, error)) *MockChainClient_HeaderByNumber_Call { + _c.Call.Return(run) + return _c +} + // SubscribeFilterLogs provides a mock function with given fields: ctx, q, ch func (_m *MockChainClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { ret := _m.Called(ctx, q, ch) @@ -260,6 +498,182 @@ func (_c *MockChainClient_SubscribeFilterLogs_Call) RunAndReturn(run func(contex return _c } +// SubscribeNewHead provides a mock function with given fields: ctx, ch +func (_m *MockChainClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + ret := _m.Called(ctx, ch) + + if len(ret) == 0 { + panic("no return value specified for SubscribeNewHead") + } + + var r0 ethereum.Subscription + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)); ok { + return rf(ctx, ch) + } + if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) ethereum.Subscription); ok { + r0 = rf(ctx, ch) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ethereum.Subscription) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, chan<- *types.Header) error); ok { + r1 = rf(ctx, ch) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_SubscribeNewHead_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SubscribeNewHead' +type MockChainClient_SubscribeNewHead_Call struct { + *mock.Call +} + +// SubscribeNewHead is a helper method to define mock.On call +// - ctx context.Context +// - ch chan<- *types.Header +func (_e *MockChainClient_Expecter) SubscribeNewHead(ctx interface{}, ch interface{}) *MockChainClient_SubscribeNewHead_Call { + return &MockChainClient_SubscribeNewHead_Call{Call: _e.mock.On("SubscribeNewHead", ctx, ch)} +} + +func (_c *MockChainClient_SubscribeNewHead_Call) Run(run func(ctx context.Context, ch chan<- *types.Header)) *MockChainClient_SubscribeNewHead_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(chan<- *types.Header)) + }) + return _c +} + +func (_c *MockChainClient_SubscribeNewHead_Call) Return(_a0 ethereum.Subscription, _a1 error) *MockChainClient_SubscribeNewHead_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_SubscribeNewHead_Call) RunAndReturn(run func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)) *MockChainClient_SubscribeNewHead_Call { + _c.Call.Return(run) + return _c +} + +// TransactionCount provides a mock function with given fields: ctx, blockHash +func (_m *MockChainClient) TransactionCount(ctx context.Context, blockHash common.Hash) (uint, error) { + ret := _m.Called(ctx, blockHash) + + if len(ret) == 0 { + panic("no return value specified for TransactionCount") + } + + var r0 uint + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (uint, error)); ok { + return rf(ctx, blockHash) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) uint); ok { + r0 = rf(ctx, blockHash) + } else { + r0 = ret.Get(0).(uint) + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, blockHash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_TransactionCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TransactionCount' +type MockChainClient_TransactionCount_Call struct { + *mock.Call +} + +// TransactionCount is a helper method to define mock.On call +// - ctx context.Context +// - blockHash common.Hash +func (_e *MockChainClient_Expecter) TransactionCount(ctx interface{}, blockHash interface{}) *MockChainClient_TransactionCount_Call { + return &MockChainClient_TransactionCount_Call{Call: _e.mock.On("TransactionCount", ctx, blockHash)} +} + +func (_c *MockChainClient_TransactionCount_Call) Run(run func(ctx context.Context, blockHash common.Hash)) *MockChainClient_TransactionCount_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash)) + }) + return _c +} + +func (_c *MockChainClient_TransactionCount_Call) Return(_a0 uint, _a1 error) *MockChainClient_TransactionCount_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_TransactionCount_Call) RunAndReturn(run func(context.Context, common.Hash) (uint, error)) *MockChainClient_TransactionCount_Call { + _c.Call.Return(run) + return _c +} + +// TransactionInBlock provides a mock function with given fields: ctx, blockHash, index +func (_m *MockChainClient) TransactionInBlock(ctx context.Context, blockHash common.Hash, index uint) (*types.Transaction, error) { + ret := _m.Called(ctx, blockHash, index) + + if len(ret) == 0 { + panic("no return value specified for TransactionInBlock") + } + + var r0 *types.Transaction + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, common.Hash, uint) (*types.Transaction, error)); ok { + return rf(ctx, blockHash, index) + } + if rf, ok := ret.Get(0).(func(context.Context, common.Hash, uint) *types.Transaction); ok { + r0 = rf(ctx, blockHash, index) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Transaction) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, common.Hash, uint) error); ok { + r1 = rf(ctx, blockHash, index) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockChainClient_TransactionInBlock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TransactionInBlock' +type MockChainClient_TransactionInBlock_Call struct { + *mock.Call +} + +// TransactionInBlock is a helper method to define mock.On call +// - ctx context.Context +// - blockHash common.Hash +// - index uint +func (_e *MockChainClient_Expecter) TransactionInBlock(ctx interface{}, blockHash interface{}, index interface{}) *MockChainClient_TransactionInBlock_Call { + return &MockChainClient_TransactionInBlock_Call{Call: _e.mock.On("TransactionInBlock", ctx, blockHash, index)} +} + +func (_c *MockChainClient_TransactionInBlock_Call) Run(run func(ctx context.Context, blockHash common.Hash, index uint)) *MockChainClient_TransactionInBlock_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(common.Hash), args[2].(uint)) + }) + return _c +} + +func (_c *MockChainClient_TransactionInBlock_Call) Return(_a0 *types.Transaction, _a1 error) *MockChainClient_TransactionInBlock_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockChainClient_TransactionInBlock_Call) RunAndReturn(run func(context.Context, common.Hash, uint) (*types.Transaction, error)) *MockChainClient_TransactionInBlock_Call { + _c.Call.Return(run) + return _c +} + // NewMockChainClient creates a new instance of MockChainClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockChainClient(t interface { diff --git a/pkg/mocks/storer/mock_LogStorer.go b/pkg/mocks/storer/mock_LogStorer.go index 516a6d96..6bf0a2b1 100644 --- a/pkg/mocks/storer/mock_LogStorer.go +++ b/pkg/mocks/storer/mock_LogStorer.go @@ -24,17 +24,17 @@ func (_m *MockLogStorer) EXPECT() *MockLogStorer_Expecter { return &MockLogStorer_Expecter{mock: &_m.Mock} } -// StoreLog provides a mock function with given fields: ctx, event -func (_m *MockLogStorer) StoreLog(ctx context.Context, event types.Log) storer.LogStorageError { - ret := _m.Called(ctx, event) +// StoreLog provides a mock function with given fields: ctx, event, appendLog +func (_m *MockLogStorer) StoreLog(ctx context.Context, event types.Log, appendLog bool) storer.LogStorageError { + ret := _m.Called(ctx, event, appendLog) if len(ret) == 0 { panic("no return value specified for StoreLog") } var r0 storer.LogStorageError - if rf, ok := ret.Get(0).(func(context.Context, types.Log) storer.LogStorageError); ok { - r0 = rf(ctx, event) + if rf, ok := ret.Get(0).(func(context.Context, types.Log, bool) storer.LogStorageError); ok { + r0 = rf(ctx, event, appendLog) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(storer.LogStorageError) @@ -52,13 +52,14 @@ type MockLogStorer_StoreLog_Call struct { // StoreLog is a helper method to define mock.On call // - ctx context.Context // - event types.Log -func (_e *MockLogStorer_Expecter) StoreLog(ctx interface{}, event interface{}) *MockLogStorer_StoreLog_Call { - return &MockLogStorer_StoreLog_Call{Call: _e.mock.On("StoreLog", ctx, event)} +// - appendLog bool +func (_e *MockLogStorer_Expecter) StoreLog(ctx interface{}, event interface{}, appendLog interface{}) *MockLogStorer_StoreLog_Call { + return &MockLogStorer_StoreLog_Call{Call: _e.mock.On("StoreLog", ctx, event, appendLog)} } -func (_c *MockLogStorer_StoreLog_Call) Run(run func(ctx context.Context, event types.Log)) *MockLogStorer_StoreLog_Call { +func (_c *MockLogStorer_StoreLog_Call) Run(run func(ctx context.Context, event types.Log, appendLog bool)) *MockLogStorer_StoreLog_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(types.Log)) + run(args[0].(context.Context), args[1].(types.Log), args[2].(bool)) }) return _c } @@ -68,7 +69,7 @@ func (_c *MockLogStorer_StoreLog_Call) Return(_a0 storer.LogStorageError) *MockL return _c } -func (_c *MockLogStorer_StoreLog_Call) RunAndReturn(run func(context.Context, types.Log) storer.LogStorageError) *MockLogStorer_StoreLog_Call { +func (_c *MockLogStorer_StoreLog_Call) RunAndReturn(run func(context.Context, types.Log, bool) storer.LogStorageError) *MockLogStorer_StoreLog_Call { _c.Call.Return(run) return _c } diff --git a/pkg/proto/identity/api/v1/identity.pb.go b/pkg/proto/identity/api/v1/identity.pb.go index cb27e311..8f939bf1 100644 --- a/pkg/proto/identity/api/v1/identity.pb.go +++ b/pkg/proto/identity/api/v1/identity.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: identity/api/v1/identity.proto diff --git a/pkg/proto/identity/associations/association.pb.go b/pkg/proto/identity/associations/association.pb.go index 766fccf4..fe108d8b 100644 --- a/pkg/proto/identity/associations/association.pb.go +++ b/pkg/proto/identity/associations/association.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: identity/associations/association.proto diff --git a/pkg/proto/identity/associations/signature.pb.go b/pkg/proto/identity/associations/signature.pb.go index 1ce6b8a9..f128a1c7 100644 --- a/pkg/proto/identity/associations/signature.pb.go +++ b/pkg/proto/identity/associations/signature.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: identity/associations/signature.proto diff --git a/pkg/proto/identity/credential.pb.go b/pkg/proto/identity/credential.pb.go index 0f472a0d..5cf2a850 100644 --- a/pkg/proto/identity/credential.pb.go +++ b/pkg/proto/identity/credential.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: identity/credential.proto diff --git a/pkg/proto/keystore_api/v1/keystore.pb.go b/pkg/proto/keystore_api/v1/keystore.pb.go index 0ac5e8aa..812ae219 100644 --- a/pkg/proto/keystore_api/v1/keystore.pb.go +++ b/pkg/proto/keystore_api/v1/keystore.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: keystore_api/v1/keystore.proto diff --git a/pkg/proto/message_api/v1/authn.pb.go b/pkg/proto/message_api/v1/authn.pb.go index f05daf9a..209cc047 100644 --- a/pkg/proto/message_api/v1/authn.pb.go +++ b/pkg/proto/message_api/v1/authn.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_api/v1/authn.proto diff --git a/pkg/proto/message_api/v1/message_api.pb.go b/pkg/proto/message_api/v1/message_api.pb.go index 5acffd14..aadaeee7 100644 --- a/pkg/proto/message_api/v1/message_api.pb.go +++ b/pkg/proto/message_api/v1/message_api.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_api/v1/message_api.proto diff --git a/pkg/proto/message_contents/ciphertext.pb.go b/pkg/proto/message_contents/ciphertext.pb.go index 44cc0f92..e7aa5778 100644 --- a/pkg/proto/message_contents/ciphertext.pb.go +++ b/pkg/proto/message_contents/ciphertext.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/ciphertext.proto diff --git a/pkg/proto/message_contents/composite.pb.go b/pkg/proto/message_contents/composite.pb.go index c51944ae..860dbc79 100644 --- a/pkg/proto/message_contents/composite.pb.go +++ b/pkg/proto/message_contents/composite.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/composite.proto diff --git a/pkg/proto/message_contents/contact.pb.go b/pkg/proto/message_contents/contact.pb.go index 650de9e3..394697d8 100644 --- a/pkg/proto/message_contents/contact.pb.go +++ b/pkg/proto/message_contents/contact.pb.go @@ -6,7 +6,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/contact.proto diff --git a/pkg/proto/message_contents/content.pb.go b/pkg/proto/message_contents/content.pb.go index b625cd2e..6da8674d 100644 --- a/pkg/proto/message_contents/content.pb.go +++ b/pkg/proto/message_contents/content.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/content.proto diff --git a/pkg/proto/message_contents/conversation_reference.pb.go b/pkg/proto/message_contents/conversation_reference.pb.go index 8ca08eac..7bac4f8d 100644 --- a/pkg/proto/message_contents/conversation_reference.pb.go +++ b/pkg/proto/message_contents/conversation_reference.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/conversation_reference.proto diff --git a/pkg/proto/message_contents/ecies.pb.go b/pkg/proto/message_contents/ecies.pb.go index 840d11f0..a36f7fda 100644 --- a/pkg/proto/message_contents/ecies.pb.go +++ b/pkg/proto/message_contents/ecies.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/ecies.proto diff --git a/pkg/proto/message_contents/frames.pb.go b/pkg/proto/message_contents/frames.pb.go index 13f0c0aa..9469c2ec 100644 --- a/pkg/proto/message_contents/frames.pb.go +++ b/pkg/proto/message_contents/frames.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/frames.proto diff --git a/pkg/proto/message_contents/invitation.pb.go b/pkg/proto/message_contents/invitation.pb.go index 001b427d..d9e49b74 100644 --- a/pkg/proto/message_contents/invitation.pb.go +++ b/pkg/proto/message_contents/invitation.pb.go @@ -4,7 +4,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/invitation.proto diff --git a/pkg/proto/message_contents/message.pb.go b/pkg/proto/message_contents/message.pb.go index 32c1939c..4cc6e64e 100644 --- a/pkg/proto/message_contents/message.pb.go +++ b/pkg/proto/message_contents/message.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/message.proto diff --git a/pkg/proto/message_contents/private_key.pb.go b/pkg/proto/message_contents/private_key.pb.go index 1bee3cfd..691f0b18 100644 --- a/pkg/proto/message_contents/private_key.pb.go +++ b/pkg/proto/message_contents/private_key.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/private_key.proto diff --git a/pkg/proto/message_contents/private_preferences.pb.go b/pkg/proto/message_contents/private_preferences.pb.go index 60d8382a..4a64714c 100644 --- a/pkg/proto/message_contents/private_preferences.pb.go +++ b/pkg/proto/message_contents/private_preferences.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/private_preferences.proto diff --git a/pkg/proto/message_contents/public_key.pb.go b/pkg/proto/message_contents/public_key.pb.go index 35e84093..6cce1dc6 100644 --- a/pkg/proto/message_contents/public_key.pb.go +++ b/pkg/proto/message_contents/public_key.pb.go @@ -3,7 +3,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/public_key.proto diff --git a/pkg/proto/message_contents/signature.pb.go b/pkg/proto/message_contents/signature.pb.go index 42b6b3b0..3d5db06a 100644 --- a/pkg/proto/message_contents/signature.pb.go +++ b/pkg/proto/message_contents/signature.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/signature.proto diff --git a/pkg/proto/message_contents/signed_payload.pb.go b/pkg/proto/message_contents/signed_payload.pb.go index 57ecdfdb..4259eb67 100644 --- a/pkg/proto/message_contents/signed_payload.pb.go +++ b/pkg/proto/message_contents/signed_payload.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: message_contents/signed_payload.proto diff --git a/pkg/proto/mls/api/v1/mls.pb.go b/pkg/proto/mls/api/v1/mls.pb.go index 26d2bb11..17e28d69 100644 --- a/pkg/proto/mls/api/v1/mls.pb.go +++ b/pkg/proto/mls/api/v1/mls.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls/api/v1/mls.proto diff --git a/pkg/proto/mls/database/intents.pb.go b/pkg/proto/mls/database/intents.pb.go index 07c212c1..0e165b89 100644 --- a/pkg/proto/mls/database/intents.pb.go +++ b/pkg/proto/mls/database/intents.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls/database/intents.proto diff --git a/pkg/proto/mls/message_contents/content.pb.go b/pkg/proto/mls/message_contents/content.pb.go index 1f3cb2d3..5b172360 100644 --- a/pkg/proto/mls/message_contents/content.pb.go +++ b/pkg/proto/mls/message_contents/content.pb.go @@ -3,7 +3,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls/message_contents/content.proto diff --git a/pkg/proto/mls/message_contents/content_types/reaction.pb.go b/pkg/proto/mls/message_contents/content_types/reaction.pb.go index 1c678e58..71786351 100644 --- a/pkg/proto/mls/message_contents/content_types/reaction.pb.go +++ b/pkg/proto/mls/message_contents/content_types/reaction.pb.go @@ -11,7 +11,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls/message_contents/content_types/reaction.proto diff --git a/pkg/proto/mls/message_contents/group_membership.pb.go b/pkg/proto/mls/message_contents/group_membership.pb.go index 0541c051..0e4d0dbf 100644 --- a/pkg/proto/mls/message_contents/group_membership.pb.go +++ b/pkg/proto/mls/message_contents/group_membership.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls/message_contents/group_membership.proto diff --git a/pkg/proto/mls/message_contents/group_metadata.pb.go b/pkg/proto/mls/message_contents/group_metadata.pb.go index df358bba..51938ae4 100644 --- a/pkg/proto/mls/message_contents/group_metadata.pb.go +++ b/pkg/proto/mls/message_contents/group_metadata.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls/message_contents/group_metadata.proto diff --git a/pkg/proto/mls/message_contents/group_mutable_metadata.pb.go b/pkg/proto/mls/message_contents/group_mutable_metadata.pb.go index 3d4b8ed7..065c8007 100644 --- a/pkg/proto/mls/message_contents/group_mutable_metadata.pb.go +++ b/pkg/proto/mls/message_contents/group_mutable_metadata.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls/message_contents/group_mutable_metadata.proto diff --git a/pkg/proto/mls/message_contents/group_permissions.pb.go b/pkg/proto/mls/message_contents/group_permissions.pb.go index a8be8d8a..e19d5431 100644 --- a/pkg/proto/mls/message_contents/group_permissions.pb.go +++ b/pkg/proto/mls/message_contents/group_permissions.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls/message_contents/group_permissions.proto diff --git a/pkg/proto/mls/message_contents/transcript_messages.pb.go b/pkg/proto/mls/message_contents/transcript_messages.pb.go index 129bd805..7cf469ef 100644 --- a/pkg/proto/mls/message_contents/transcript_messages.pb.go +++ b/pkg/proto/mls/message_contents/transcript_messages.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls/message_contents/transcript_messages.proto diff --git a/pkg/proto/mls_validation/v1/service.pb.go b/pkg/proto/mls_validation/v1/service.pb.go index ab67d806..295b0365 100644 --- a/pkg/proto/mls_validation/v1/service.pb.go +++ b/pkg/proto/mls_validation/v1/service.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: mls_validation/v1/service.proto diff --git a/pkg/proto/xmtpv4/envelopes/envelopes.pb.go b/pkg/proto/xmtpv4/envelopes/envelopes.pb.go index 1a605230..d6b35971 100644 --- a/pkg/proto/xmtpv4/envelopes/envelopes.pb.go +++ b/pkg/proto/xmtpv4/envelopes/envelopes.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: xmtpv4/envelopes/envelopes.proto diff --git a/pkg/proto/xmtpv4/message_api/message_api.pb.go b/pkg/proto/xmtpv4/message_api/message_api.pb.go index 57adda97..593c91c1 100644 --- a/pkg/proto/xmtpv4/message_api/message_api.pb.go +++ b/pkg/proto/xmtpv4/message_api/message_api.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: xmtpv4/message_api/message_api.proto diff --git a/pkg/proto/xmtpv4/message_api/misbehavior_api.pb.go b/pkg/proto/xmtpv4/message_api/misbehavior_api.pb.go index 27acd356..589002d1 100644 --- a/pkg/proto/xmtpv4/message_api/misbehavior_api.pb.go +++ b/pkg/proto/xmtpv4/message_api/misbehavior_api.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: xmtpv4/message_api/misbehavior_api.proto diff --git a/pkg/proto/xmtpv4/payer_api/payer_api.pb.go b/pkg/proto/xmtpv4/payer_api/payer_api.pb.go index ca99649b..7376da14 100644 --- a/pkg/proto/xmtpv4/payer_api/payer_api.pb.go +++ b/pkg/proto/xmtpv4/payer_api/payer_api.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.2 +// protoc-gen-go v1.36.3 // protoc (unknown) // source: xmtpv4/payer_api/payer_api.proto From 743cfa8a218e00a0b0bdc9837298a22db6ca0731 Mon Sep 17 00:00:00 2001 From: Borja Aranda Date: Thu, 16 Jan 2025 17:12:51 +0100 Subject: [PATCH 6/6] handle channel closures in writer --- pkg/blockchain/rpcLogStreamer.go | 2 +- pkg/indexer/indexer.go | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/blockchain/rpcLogStreamer.go b/pkg/blockchain/rpcLogStreamer.go index 73c435a7..64e127d6 100644 --- a/pkg/blockchain/rpcLogStreamer.go +++ b/pkg/blockchain/rpcLogStreamer.go @@ -129,7 +129,7 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) { logger := r.logger.With(zap.String("contractAddress", watcher.contractAddress.Hex())) startTime := time.Now() defer close(watcher.eventChannel) - defer close(watcher.reorgChannel) + for { select { case <-r.ctx.Done(): diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 3ae811c7..2d78ffa4 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -46,14 +46,20 @@ func NewIndexer( } } -func (s *Indexer) Close() { - s.log.Debug("Closing") - if s.streamer != nil { - s.streamer.streamer.Stop() +func (i *Indexer) Close() { + i.log.Debug("Closing") + if i.streamer != nil { + if i.streamer.messagesReorgChannel != nil { + close(i.streamer.messagesReorgChannel) + } + if i.streamer.identityUpdatesReorgChannel != nil { + close(i.streamer.identityUpdatesReorgChannel) + } + i.streamer.streamer.Stop() } - s.cancel() - s.wg.Wait() - s.log.Debug("Closed") + i.cancel() + i.wg.Wait() + i.log.Debug("Closed") } func (i *Indexer) StartIndexer(