Skip to content

Commit

Permalink
polygon: Apply RoTx limit (#11963)
Browse files Browse the repository at this point in the history
Fixes #11899
shohamc1 authored Sep 12, 2024
1 parent a77a069 commit c5d3284
Showing 10 changed files with 38 additions and 31 deletions.
7 changes: 4 additions & 3 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
@@ -348,6 +348,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
var allSnapshots *freezeblocks.RoSnapshots
var allBorSnapshots *freezeblocks.BorRoSnapshots
onNewSnapshot := func() {}
roTxLimit := int64(cfg.DBReadConcurrency)

var cc *chain.Config

@@ -363,7 +364,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Accede mode will check db existence (may wait with retries). It's ok to fail in this case - some supervisor will restart us.
var rwKv kv.RwDB
logger.Warn("Opening chain db", "path", cfg.Dirs.Chaindata)
limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency))
limiter := semaphore.NewWeighted(roTxLimit)
rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err
@@ -504,12 +505,12 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
if cc != nil && cc.Bor != nil {
if polygonSync {
stateReceiverContractAddress := cc.Bor.GetStateReceiverContract()
bridgeReader, err = bridge.AssembleReader(ctx, cfg.DataDir, logger, stateReceiverContractAddress)
bridgeReader, err = bridge.AssembleReader(ctx, cfg.DataDir, logger, stateReceiverContractAddress, roTxLimit)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err
}

heimdallReader, err = heimdall.AssembleReader(ctx, cc.Bor.CalculateSprintNumber, cfg.DataDir, cfg.Dirs.Tmp, logger)
heimdallReader, err = heimdall.AssembleReader(ctx, cc.Bor.CalculateSprintNumber, cfg.DataDir, cfg.Dirs.Tmp, logger, roTxLimit)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err
}
6 changes: 4 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
@@ -550,8 +550,10 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

if config.PolygonSync {
borConfig := consensusConfig.(*borcfg.BorConfig)
polygonBridge = bridge.Assemble(config.Dirs.DataDir, logger, borConfig, heimdallClient)
heimdallService = heimdall.AssembleService(borConfig.CalculateSprintNumber, config.HeimdallURL, dirs.DataDir, tmpdir, logger)
roTxLimit := int64(stack.Config().Http.DBReadConcurrency)

polygonBridge = bridge.Assemble(config.Dirs.DataDir, logger, borConfig, heimdallClient, roTxLimit)
heimdallService = heimdall.AssembleService(borConfig.CalculateSprintNumber, config.HeimdallURL, dirs.DataDir, tmpdir, logger, roTxLimit)
bridgeRPC = bridge.NewBackendServer(ctx, polygonBridge)

backend.polygonBridge = polygonBridge
4 changes: 2 additions & 2 deletions polygon/bridge/bridge.go
Original file line number Diff line number Diff line change
@@ -40,8 +40,8 @@ type eventFetcher interface {
FetchStateSyncEvents(ctx context.Context, fromId uint64, to time.Time, limit int) ([]*heimdall.EventRecordWithTime, error)
}

func Assemble(dataDir string, logger log.Logger, borConfig *borcfg.BorConfig, eventFetcher eventFetcher) *Bridge {
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger, false /* accede */)
func Assemble(dataDir string, logger log.Logger, borConfig *borcfg.BorConfig, eventFetcher eventFetcher, roTxLimit int64) *Bridge {
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger, false /* accede */, roTxLimit)
bridgeStore := NewStore(bridgeDB)
reader := NewReader(bridgeStore, logger, borConfig.StateReceiverContract)
return NewBridge(bridgeStore, logger, borConfig, eventFetcher, reader)
2 changes: 1 addition & 1 deletion polygon/bridge/bridge_test.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ func setup(t *testing.T, borConfig borcfg.BorConfig) (*heimdall.MockHeimdallClie
ctrl := gomock.NewController(t)
logger := testlog.Logger(t, log.LvlDebug)
heimdallClient := heimdall.NewMockHeimdallClient(ctrl)
b := Assemble(t.TempDir(), logger, &borConfig, heimdallClient)
b := Assemble(t.TempDir(), logger, &borConfig, heimdallClient, 1)
t.Cleanup(b.Close)
return heimdallClient, b
}
4 changes: 2 additions & 2 deletions polygon/bridge/reader.go
Original file line number Diff line number Diff line change
@@ -26,8 +26,8 @@ type Reader struct {
stateClientAddress libcommon.Address
}

func AssembleReader(ctx context.Context, dataDir string, logger log.Logger, stateReceiverContractAddress string) (*Reader, error) {
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger, true /* accede */)
func AssembleReader(ctx context.Context, dataDir string, logger log.Logger, stateReceiverContractAddress string, roTxLimit int64) (*Reader, error) {
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger, true /* accede */, roTxLimit)
bridgeStore := NewStore(bridgeDB)

err := bridgeStore.Prepare(ctx)
4 changes: 2 additions & 2 deletions polygon/heimdall/reader.go
Original file line number Diff line number Diff line change
@@ -15,8 +15,8 @@ type Reader struct {
}

// AssembleReader creates and opens the MDBX store. For use cases where the store is only being read from. Must call Close.
func AssembleReader(ctx context.Context, calculateSprintNumber CalculateSprintNumberFunc, dataDir string, tmpDir string, logger log.Logger) (*Reader, error) {
store := NewMdbxServiceStore(logger, dataDir, tmpDir)
func AssembleReader(ctx context.Context, calculateSprintNumber CalculateSprintNumberFunc, dataDir string, tmpDir string, logger log.Logger, roTxLimit int64) (*Reader, error) {
store := NewMdbxServiceStore(logger, dataDir, tmpDir, roTxLimit)

err := store.Prepare(ctx)
if err != nil {
4 changes: 2 additions & 2 deletions polygon/heimdall/service.go
Original file line number Diff line number Diff line change
@@ -52,8 +52,8 @@ type service struct {
spanBlockProducersTracker *spanBlockProducersTracker
}

func AssembleService(calculateSprintNumberFn CalculateSprintNumberFunc, heimdallUrl string, dataDir string, tmpDir string, logger log.Logger) Service {
store := NewMdbxServiceStore(logger, dataDir, tmpDir)
func AssembleService(calculateSprintNumberFn CalculateSprintNumberFunc, heimdallUrl string, dataDir string, tmpDir string, logger log.Logger, roTxLimit int64) Service {
store := NewMdbxServiceStore(logger, dataDir, tmpDir, roTxLimit)
client := NewHeimdallClient(heimdallUrl, logger)
reader := NewReader(calculateSprintNumberFn, store, logger)
return NewService(calculateSprintNumberFn, client, store, logger, reader)
4 changes: 2 additions & 2 deletions polygon/heimdall/service_store.go
Original file line number Diff line number Diff line change
@@ -36,8 +36,8 @@ type ServiceStore interface {
Close()
}

func NewMdbxServiceStore(logger log.Logger, dataDir string, tmpDir string) *MdbxServiceStore {
db := polygoncommon.NewDatabase(dataDir, kv.HeimdallDB, databaseTablesCfg, logger, false /* accede */)
func NewMdbxServiceStore(logger log.Logger, dataDir string, tmpDir string, roTxLimit int64) *MdbxServiceStore {
db := polygoncommon.NewDatabase(dataDir, kv.HeimdallDB, databaseTablesCfg, logger, false /* accede */, roTxLimit)
blockNumToIdIndexFactory := func(ctx context.Context) (*RangeIndex, error) {
return NewRangeIndex(ctx, tmpDir, logger)
}
2 changes: 1 addition & 1 deletion polygon/heimdall/service_test.go
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ func (suite *ServiceTestSuite) SetupSuite() {
tempDir := suite.T().TempDir()
dataDir := fmt.Sprintf("%s/datadir", tempDir)
logger := testlog.Logger(suite.T(), log.LvlCrit)
store := NewMdbxServiceStore(logger, dataDir, tempDir)
store := NewMdbxServiceStore(logger, dataDir, tempDir, 1)
borConfig := params.AmoyChainConfig.Bor.(*borcfg.BorConfig)
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.client = NewMockHeimdallClient(ctrl)
32 changes: 18 additions & 14 deletions polygon/polygoncommon/database.go
Original file line number Diff line number Diff line change
@@ -22,29 +22,32 @@ import (
"sync"

"github.com/c2h5oh/datasize"
"golang.org/x/sync/semaphore"

"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/mdbx"
"github.com/erigontech/erigon-lib/log/v3"
)

type Database struct {
db kv.RwDB
dataDir string
label kv.Label
tableCfg kv.TableCfg
openOnce sync.Once
logger log.Logger
accede bool
db kv.RwDB
dataDir string
label kv.Label
tableCfg kv.TableCfg
openOnce sync.Once
logger log.Logger
accede bool
roTxLimit int64
}

func NewDatabase(dataDir string, label kv.Label, tableCfg kv.TableCfg, logger log.Logger, accede bool) *Database {
func NewDatabase(dataDir string, label kv.Label, tableCfg kv.TableCfg, logger log.Logger, accede bool, roTxLimit int64) *Database {
return &Database{
dataDir: dataDir,
label: label,
tableCfg: tableCfg,
logger: logger,
accede: accede,
dataDir: dataDir,
label: label,
tableCfg: tableCfg,
logger: logger,
accede: accede,
roTxLimit: roTxLimit,
}
}

@@ -58,7 +61,8 @@ func (db *Database) open(ctx context.Context) error {
Path(dbPath).
WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return db.tableCfg }).
MapSize(16 * datasize.GB).
GrowthStep(16 * datasize.MB)
GrowthStep(16 * datasize.MB).
RoTxsLimiter(semaphore.NewWeighted(db.roTxLimit))

if db.accede {
opts = opts.Accede()

0 comments on commit c5d3284

Please sign in to comment.