Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Generic DB for listener #2129

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions ethergo/chain/listener/db/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package db

Check warning on line 1 in ethergo/chain/listener/db/service.go

View workflow job for this annotation

GitHub Actions / Lint (ethergo)

package-comments: should have a package comment (revive)

import (
"context"
"gorm.io/gorm"
"time"
)

// ChainListenerDB is the interface for the chain listener database.
type ChainListenerDB interface {
// PutLatestBlock upsers the latest block on a given chain id to be new height.
PutLatestBlock(ctx context.Context, chainID, height uint64) error
// LatestBlockForChain gets the latest block for a given chain id.
// will return ErrNoLatestBlockForChainID if no block exists for the chain.
LatestBlockForChain(ctx context.Context, chainID uint64) (uint64, error)
}

// LastIndexed is used to make sure we haven't missed any events while offline.
// since we event source - rather than use a state machine this is needed to make sure we haven't missed any events
// by allowing us to go back and source any events we may have missed.
//
// this does not inherit from gorm.model to allow us to use ChainID as a primary key.
type LastIndexed struct {
// CreatedAt is the creation time
CreatedAt time.Time
// UpdatedAt is the update time
UpdatedAt time.Time
// DeletedAt time
DeletedAt gorm.DeletedAt `gorm:"index"`
// ChainID is the chain id of the chain we're watching blocks on. This is our primary index.
ChainID uint64 `gorm:"column:chain_id;primaryKey;autoIncrement:false"`
// BlockHeight is the highest height we've seen on the chain
BlockNumber int `gorm:"block_number"`
}

// GetAllModels gets all models to migrate
// see: https://medium.com/@SaifAbid/slice-interfaces-8c78f8b6345d for an explanation of why we can't do this at initialization time
func GetAllModels() (allModels []interface{}) {
allModels = []interface{}{&LastIndexed{}}
return allModels
}
71 changes: 71 additions & 0 deletions ethergo/chain/listener/db/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package db

Check failure on line 1 in ethergo/chain/listener/db/store.go

View workflow job for this annotation

GitHub Actions / Lint (ethergo)

ST1000: at least one file in a package should have a package comment (stylecheck)

import (
"context"
"errors"
"fmt"
"github.com/synapsecns/sanguine/core/dbcommon"
"github.com/synapsecns/sanguine/core/metrics"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

// NewChainListenerStore creates a new transaction store.
func NewChainListenerStore(db *gorm.DB, metrics metrics.Handler) *Store {
return &Store{
db: db,
metrics: metrics,
}
}

// Store is the sqlite store. It extends the base store for sqlite specific queries.
type Store struct {
db *gorm.DB
metrics metrics.Handler
}

// PutLatestBlock upserts the latest block into the database.
func (s Store) PutLatestBlock(ctx context.Context, chainID, height uint64) error {
tx := s.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: chainIDFieldName}},
DoUpdates: clause.AssignmentColumns([]string{chainIDFieldName, blockNumberFieldName}),
}).Create(&LastIndexed{
ChainID: chainID,
BlockNumber: int(height),
})

if tx.Error != nil {
return fmt.Errorf("could not block updated: %w", tx.Error)
}
return nil
}

// LatestBlockForChain gets the latest block for a chain.
func (s Store) LatestBlockForChain(ctx context.Context, chainID uint64) (uint64, error) {
blockWatchModel := LastIndexed{ChainID: chainID}
err := s.db.WithContext(ctx).First(&blockWatchModel).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return 0, ErrNoLatestBlockForChainID
}
return 0, fmt.Errorf("could not fetch latest block: %w", err)
}

return uint64(blockWatchModel.BlockNumber), nil
}

func init() {
namer := dbcommon.NewNamer(GetAllModels())
chainIDFieldName = namer.GetConsistentName("ChainID")
blockNumberFieldName = namer.GetConsistentName("BlockNumber")
}

var (
// chainIDFieldName gets the chain id field name.
chainIDFieldName string
// blockNumberFieldName is the name of the block number field.
blockNumberFieldName string
)

// ErrNoLatestBlockForChainID is returned when no block exists for the chain.
var ErrNoLatestBlockForChainID = errors.New("no latest block for chainId")
5 changes: 2 additions & 3 deletions ethergo/chain/listener/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package listener

import (
"context"

"github.com/ethereum/go-ethereum/common"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/synapsecns/sanguine/ethergo/chain/listener/db"
"github.com/synapsecns/sanguine/ethergo/client"
"github.com/synapsecns/sanguine/services/rfq/relayer/reldb"
)

// TestChainListener wraps chain listener for testing.
Expand All @@ -24,7 +23,7 @@ type TestChainListenerArgs struct {
Address common.Address
InitialBlock uint64
Client client.EVM
Store reldb.Service
Store db.ChainListenerDB
Handler metrics.Handler
}

Expand Down
56 changes: 36 additions & 20 deletions ethergo/chain/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"errors"
"fmt"
"github.com/synapsecns/sanguine/ethergo/chain/listener/db"
"math/big"
"time"

Expand All @@ -14,9 +15,9 @@
"github.com/jpillora/backoff"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/synapsecns/sanguine/ethergo/client"
"github.com/synapsecns/sanguine/services/rfq/relayer/reldb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)

// ContractListener listens for chain events and calls HandleLog.
Expand All @@ -38,7 +39,7 @@
client client.EVM
address common.Address
initialBlock uint64
store reldb.Service
store db.ChainListenerDB
handler metrics.Handler
backoff *backoff.Backoff
// IMPORTANT! These fields cannot be used until they has been set. They are NOT
Expand All @@ -48,10 +49,14 @@
// latestBlock uint64
}

var logger = log.Logger("chainlistener-logger")
var (
logger = log.Logger("chainlistener-logger")
// ErrNoLatestBlockForChainID is returned when no block exists for the chain.
ErrNoLatestBlockForChainID = db.ErrNoLatestBlockForChainID
)

// NewChainListener creates a new chain listener.
func NewChainListener(omnirpcClient client.EVM, store reldb.Service, address common.Address, initialBlock uint64, handler metrics.Handler) (ContractListener, error) {
func NewChainListener(omnirpcClient client.EVM, store db.ChainListenerDB, address common.Address, initialBlock uint64, handler metrics.Handler) (ContractListener, error) {
return &chainListener{
handler: handler,
address: address,
Expand Down Expand Up @@ -166,23 +171,34 @@

// TODO: consider some kind of backoff here in case rpcs are down at boot.
// this becomes more of an issue as we add more chains
// TODO: one thing I've been going back and forth on is whether or not this method should be chain aware
// passing in the chain ID would allow us to pull everything directly from the config, but be less testable
// for now, this is probably the best solution for testability, but it's certainly a bit annoying we need to do
// an rpc call in order to get the chain id
//
rpcChainID, err := c.client.ChainID(ctx)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
// TODO: one thing I've been going back and forth on is whether or not this method should be chain aware
// passing in the chain ID would allow us to pull everything directly from the config, but be less testable
// for now, this is probably the best solution for testability, but it's certainly a bit annoying we need to do
// an rpc call in order to get the chain id
//
rpcChainID, err := c.client.ChainID(ctx)
if err != nil {
return fmt.Errorf("could not get chain ID: %w", err)
}

Check warning on line 184 in ethergo/chain/listener/listener.go

View check run for this annotation

Codecov / codecov/patch

ethergo/chain/listener/listener.go#L183-L184

Added lines #L183 - L184 were not covered by tests
chainID = rpcChainID.Uint64()

lastIndexed, err = c.store.LatestBlockForChain(ctx, chainID)
if errors.Is(err, ErrNoLatestBlockForChainID) {
// TODO: consider making this negative 1, requires type change
lastIndexed = 0
return nil
}
if err != nil {
return fmt.Errorf("could not get the latest block for chainID: %w", err)
}

Check warning on line 195 in ethergo/chain/listener/listener.go

View check run for this annotation

Codecov / codecov/patch

ethergo/chain/listener/listener.go#L194-L195

Added lines #L194 - L195 were not covered by tests
return nil
})

err = g.Wait()
if err != nil {
return 0, 0, fmt.Errorf("could not get chain ID: %w", err)
}
chainID = rpcChainID.Uint64()

lastIndexed, err = c.store.LatestBlockForChain(ctx, chainID)
if errors.Is(err, reldb.ErrNoLatestBlockForChainID) {
// TODO: consider making this negative 1, requires type change
lastIndexed = 0
} else if err != nil {
return 0, 0, fmt.Errorf("could not get the latest block for chainID: %w", err)
return 0, 0, fmt.Errorf("could not get metadata: %w", err)

Check warning on line 201 in ethergo/chain/listener/listener.go

View check run for this annotation

Codecov / codecov/patch

ethergo/chain/listener/listener.go#L201

Added line #L201 was not covered by tests
}

if lastIndexed > c.startBlock {
Expand Down
66 changes: 62 additions & 4 deletions ethergo/chain/listener/suite_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
package listener_test

import (
"context"
"fmt"
"github.com/brianvoe/gofakeit/v6"
"github.com/ipfs/go-log"
common_base "github.com/synapsecns/sanguine/core/dbcommon"
"github.com/synapsecns/sanguine/ethergo/chain/listener/db"
"github.com/synapsecns/sanguine/ethergo/submitter/db/txdb"
"gorm.io/gorm"
"gorm.io/gorm/schema"
"math/big"
"os"
"testing"
"time"

"github.com/Flaque/filet"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -14,9 +25,8 @@ import (
"github.com/synapsecns/sanguine/ethergo/chain/listener"
"github.com/synapsecns/sanguine/ethergo/contracts"
"github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge"
"github.com/synapsecns/sanguine/services/rfq/relayer/reldb"
"github.com/synapsecns/sanguine/services/rfq/relayer/reldb/sqlite"
"github.com/synapsecns/sanguine/services/rfq/testutil"
"gorm.io/driver/sqlite"
)

const chainID = 10
Expand All @@ -25,7 +35,7 @@ type ListenerTestSuite struct {
*testsuite.TestSuite
manager *testutil.DeployManager
backend backends.SimulatedTestBackend
store reldb.Service
store db.ChainListenerDB
metrics metrics.Handler
fastBridge *fastbridge.FastBridgeRef
fastBridgeMetadata contracts.DeployedContract
Expand All @@ -48,7 +58,7 @@ func (l *ListenerTestSuite) SetupTest() {
l.backend = geth.NewEmbeddedBackendForChainID(l.GetTestContext(), l.T(), big.NewInt(chainID))
var err error
l.metrics = metrics.NewNullHandler()
l.store, err = sqlite.NewSqliteStore(l.GetTestContext(), filet.TmpDir(l.T(), ""), l.metrics)
l.store, err = NewSqliteStore(l.GetTestContext(), filet.TmpDir(l.T(), ""), l.metrics)
l.Require().NoError(err)

l.fastBridgeMetadata, l.fastBridge = l.manager.GetFastBridge(l.GetTestContext(), l.backend)
Expand Down Expand Up @@ -96,3 +106,51 @@ func (l *ListenerTestSuite) TestStartBlock() {
func (l *ListenerTestSuite) TestListen() {

}

// NewSqliteStore creates a new sqlite data store.
func NewSqliteStore(parentCtx context.Context, dbPath string, handler metrics.Handler) (_ *db.Store, err error) {
logger := log.Logger("sqlite-store")

logger.Debugf("creating sqlite store at %s", dbPath)

ctx, span := handler.Tracer().Start(parentCtx, "start-sqlite")
defer func() {
metrics.EndSpanWithErr(span, err)
}()

// create the directory to the store if it doesn't exist
err = os.MkdirAll(dbPath, os.ModePerm)
if err != nil {
return nil, fmt.Errorf("could not create sqlite store")
}

logger.Warnf("submitter database is at %s/synapse.db", dbPath)

namingStrategy := schema.NamingStrategy{
TablePrefix: fmt.Sprintf("test%d_%d_", gofakeit.Int64(), time.Now().Unix()),
}

gdb, err := gorm.Open(sqlite.Open(fmt.Sprintf("%s/%s", dbPath, "synapse.db")), &gorm.Config{
DisableForeignKeyConstraintWhenMigrating: true,
Logger: common_base.GetGormLogger(logger),
FullSaveAssociations: true,
SkipDefaultTransaction: true,
NamingStrategy: namingStrategy,
})
if err != nil {
return nil, fmt.Errorf("could not connect to db %s: %w", dbPath, err)
}

err = gdb.AutoMigrate(&db.LastIndexed{})
if err != nil {
return nil, fmt.Errorf("could not migrate models: %w", err)
}

handler.AddGormCallbacks(gdb)

err = gdb.WithContext(ctx).AutoMigrate(txdb.GetAllModels()...)
if err != nil {
return nil, fmt.Errorf("could not migrate models: %w", err)
}
return db.NewChainListenerStore(gdb, handler), nil
}
40 changes: 0 additions & 40 deletions services/rfq/relayer/reldb/base/block.go

This file was deleted.

Loading
Loading