Skip to content

Commit

Permalink
update models, queries and storers
Browse files Browse the repository at this point in the history
  • Loading branch information
fbac committed Jan 16, 2025
1 parent 3603d02 commit 7e6a14e
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 29 deletions.
1 change: 1 addition & 0 deletions pkg/blockchain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ChainClient interface {
ethereum.BlockNumberReader
ethereum.LogFilterer
ethereum.ChainIDReader
ethereum.ChainReader
}

type TransactionSigner interface {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type ContractsOptions struct {
ChainID int `long:"chain-id" env:"XMTPD_CONTRACTS_CHAIN_ID" description:"Chain ID for the appchain" default:"31337"`
RefreshInterval time.Duration `long:"refresh-interval" env:"XMTPD_CONTRACTS_REFRESH_INTERVAL" description:"Refresh interval for the nodes registry" default:"60s"`
MaxChainDisconnectTime time.Duration `long:"max-chain-disconnect-time" env:"XMTPD_CONTRACTS_MAX_CHAIN_DISCONNECT_TIME" description:"Maximum time to allow the node to operate while disconnected" default:"300s"`
// TODO: Calculate the blocks safe distance in the L3 or risk tolerance we assume
SafeBlockDistance uint64 `long:"safe-block-distance" env:"XMTPD_CONTRACTS_SAFE_BLOCK_DISTANCE" description:"Safe block distance" default:"0"`
}

type DbOptions struct {
Expand Down
20 changes: 20 additions & 0 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,23 @@ FROM
WHERE
contract_address = @contract_address;

-- name: GetEnvelopeVersion :one
SELECT
version
FROM
gateway_envelopes
WHERE
originator_node_id = $1
AND originator_sequence_id = $2
AND is_canonical = TRUE;

-- name: InvalidateEnvelope :exec
UPDATE
gateway_envelopes
SET
is_canonical = FALSE
WHERE
originator_node_id = $1
AND originator_sequence_id = $2
AND is_canonical = TRUE;

6 changes: 3 additions & 3 deletions pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 47 additions & 3 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 11 additions & 7 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (i *Indexer) StartIndexer(
client,
streamer.messagesChannel,
streamer.messagesReorgChannel,
cfg.SafeBlockDistance,
indexingLogger,
storer.NewGroupMessageStorer(querier, indexingLogger, messagesContract),
streamer.messagesBlockTracker,
Expand All @@ -115,6 +116,7 @@ func (i *Indexer) StartIndexer(
client,
streamer.identityUpdatesChannel,
streamer.identityUpdatesReorgChannel,
cfg.SafeBlockDistance,
indexingLogger,
storer.NewIdentityUpdateStorer(
db,
Expand Down Expand Up @@ -207,22 +209,24 @@ The only non-retriable errors should be things like malformed events or failed v
*/
func indexLogs(
ctx context.Context,
client *ethclient.Client,
client blockchain.ChainClient,
eventChannel <-chan types.Log,
reorgChannel chan<- uint64,
safeBlockDistance uint64,
logger *zap.Logger,
logStorer storer.LogStorer,
blockTracker IBlockTracker,
) {
var errStorage storer.LogStorageError

// We don't need to listen for the ctx.Done() here, since the eventChannel will be closed when the parent context is canceled
for event := range eventChannel {
storedBlockNumber, storedBlockHash := blockTracker.GetLatestBlock()

// TODO: Calculate the blocks safe distance in the L3 or risk tolerance we assume
// idea - SafeBlockDistance uint64 `env:"SAFE_BLOCK_DISTANCE" envDefault:"100"`
if event.BlockNumber > storedBlockNumber && event.BlockNumber-storedBlockNumber < 100 {
latestBlockHash, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber)))
if event.BlockNumber > storedBlockNumber &&
event.BlockNumber-storedBlockNumber < safeBlockDistance {
latestBlock, err := client.BlockByNumber(ctx, big.NewInt(int64(storedBlockNumber)))
if err != nil {
logger.Error("error getting block",
zap.Uint64("blockNumber", storedBlockNumber),
Expand All @@ -232,11 +236,11 @@ func indexLogs(
continue
}

if !bytes.Equal(storedBlockHash, latestBlockHash.Hash().Bytes()) {
if !bytes.Equal(storedBlockHash, latestBlock.Hash().Bytes()) {
logger.Warn("blockchain reorg detected",
zap.Uint64("storedBlockNumber", storedBlockNumber),
zap.String("storedBlockHash", hex.EncodeToString(storedBlockHash)),
zap.String("onchainBlockHash", latestBlockHash.Hash().String()),
zap.String("onchainBlockHash", latestBlock.Hash().String()),
)

// TODO: Implement reorg handling:
Expand All @@ -256,7 +260,7 @@ func indexLogs(

Retry:
for {
errStorage = logStorer.StoreLog(ctx, event)
errStorage = logStorer.StoreLog(ctx, event, false)
if errStorage != nil {
logger.Error("error storing log", zap.Error(errStorage))
if errStorage.ShouldRetry() {
Expand Down
46 changes: 39 additions & 7 deletions pkg/indexer/storer/groupMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storer

import (
"context"
"database/sql"
"errors"

"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -13,6 +14,11 @@ import (
"google.golang.org/protobuf/proto"
)

const (
// We may not want to hardcode this to 0 and have an originator ID for each smart contract?
GROUP_MESSAGE_ORIGINATOR_ID = 0
)

type GroupMessageStorer struct {
contract *groupmessages.GroupMessages
queries *queries.Queries
Expand All @@ -32,7 +38,11 @@ func NewGroupMessageStorer(
}

// Validate and store a group message log event
func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError {
func (s *GroupMessageStorer) StoreLog(
ctx context.Context,
event types.Log,
appendLog bool,
) LogStorageError {
msgSent, err := s.contract.ParseMessageSent(event)
if err != nil {
return NewLogStorageError(err, false)
Expand Down Expand Up @@ -75,15 +85,37 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS
return NewLogStorageError(err, false)
}

version := sql.NullInt32{Int32: 1, Valid: true}

if appendLog {
version, err = GetVersionForAppend(
ctx,
s.queries,
s.logger,
GROUP_MESSAGE_ORIGINATOR_ID,
int64(msgSent.SequenceId),
)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return NewLogStorageError(err, true)
}
if errors.Is(err, sql.ErrNoRows) {
s.logger.Debug("No rows found for envelope, inserting new",
zap.Int("originator_node_id", GROUP_MESSAGE_ORIGINATOR_ID),
zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)),
)
}
}
}

s.logger.Debug("Inserting message from contract", zap.String("topic", topicStruct.String()))

if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
BlockNumber: int64(event.BlockNumber),
BlockHash: event.BlockHash.Bytes(),
Version: 1, // TODO: Make this dynamic
IsCanonical: true, // TODO: Make this dynamic
// We may not want to hardcode this to 0 and have an originator ID for each smart contract?
OriginatorNodeID: 0,
BlockNumber: sql.NullInt64{Int64: int64(event.BlockNumber), Valid: true},
BlockHash: event.BlockHash.Bytes(),
Version: version,
IsCanonical: sql.NullBool{Bool: true, Valid: true},
OriginatorNodeID: GROUP_MESSAGE_ORIGINATOR_ID,
OriginatorSequenceID: int64(msgSent.SequenceId),
Topic: topicStruct.Bytes(),
OriginatorEnvelope: originatorEnvelopeBytes,
Expand Down
36 changes: 34 additions & 2 deletions pkg/indexer/storer/identityUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storer
import (
"context"
"database/sql"
"errors"
"fmt"
"time"

Expand All @@ -23,6 +24,7 @@ import (
)

const (
// We may not want to hardcode this to 1 and have an originator ID for each smart contract?
IDENTITY_UPDATE_ORIGINATOR_ID = 1
)

Expand All @@ -48,7 +50,11 @@ func NewIdentityUpdateStorer(
}

// Validate and store an identity update log event
func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError {
func (s *IdentityUpdateStorer) StoreLog(
ctx context.Context,
event types.Log,
appendLog bool,
) LogStorageError {
msgSent, err := s.contract.ParseIdentityUpdateCreated(event)
if err != nil {
return NewLogStorageError(err, false)
Expand Down Expand Up @@ -166,8 +172,34 @@ func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) Lo
return NewLogStorageError(err, true)
}

version := sql.NullInt32{Int32: 1, Valid: true}

if appendLog {
version, err = GetVersionForAppend(
ctx,
querier,
s.logger,
IDENTITY_UPDATE_ORIGINATOR_ID,
int64(msgSent.SequenceId),
)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return NewLogStorageError(err, true)
}
if errors.Is(err, sql.ErrNoRows) {
s.logger.Debug("No rows found for envelope, inserting new",
zap.Int("originator_node_id", IDENTITY_UPDATE_ORIGINATOR_ID),
zap.Int64("originator_sequence_id", int64(msgSent.SequenceId)),
)
}
}
}

if _, err = querier.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
// We may not want to hardcode this to 1 and have an originator ID for each smart contract?
BlockNumber: sql.NullInt64{Int64: int64(event.BlockNumber), Valid: true},
BlockHash: event.BlockHash.Bytes(),
Version: version,
IsCanonical: sql.NullBool{Bool: true, Valid: true},
OriginatorNodeID: IDENTITY_UPDATE_ORIGINATOR_ID,
OriginatorSequenceID: int64(msgSent.SequenceId),
Topic: messageTopic.Bytes(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/indexer/storer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (

// Takes a log event and stores it, returning either an error that may be retriable, non-retriable, or nil
type LogStorer interface {
StoreLog(ctx context.Context, event types.Log) LogStorageError
StoreLog(ctx context.Context, event types.Log, appendLog bool) LogStorageError
}
49 changes: 49 additions & 0 deletions pkg/indexer/storer/logAppend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package storer

import (
"context"
"database/sql"
"errors"

"github.com/xmtp/xmtpd/pkg/db/queries"
"go.uber.org/zap"
)

func GetVersionForAppend(
ctx context.Context,
querier *queries.Queries,
logger *zap.Logger,
originatorNodeID int32,
sequenceID int64,
) (sql.NullInt32, error) {
var version sql.NullInt32
currentVersion, err := querier.GetEnvelopeVersion(ctx, queries.GetEnvelopeVersionParams{
OriginatorNodeID: originatorNodeID,
OriginatorSequenceID: sequenceID,
})
if err != nil && !errors.Is(err, sql.ErrNoRows) {
logger.Error("Error getting current version", zap.Error(err))
return version, err
}

if errors.Is(err, sql.ErrNoRows) {
return version, err
}

if err == nil {
if err = querier.InvalidateEnvelope(ctx, queries.InvalidateEnvelopeParams{
OriginatorNodeID: originatorNodeID,
OriginatorSequenceID: sequenceID,
}); err != nil {
logger.Error("Error invalidating old envelope", zap.Error(err))
return version, err
}

version = sql.NullInt32{
Int32: currentVersion.Int32 + 1,
Valid: true,
}
}

return version, nil
}
Loading

0 comments on commit 7e6a14e

Please sign in to comment.