Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon: Apply RoTx limit #11963

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
var allSnapshots *freezeblocks.RoSnapshots
var allBorSnapshots *freezeblocks.BorRoSnapshots
onNewSnapshot := func() {}
roTxLimit := int64(cfg.DBReadConcurrency)

var cc *chain.Config

Expand All @@ -363,7 +364,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Accede mode will check db existence (may wait with retries). It's ok to fail in this case - some supervisor will restart us.
var rwKv kv.RwDB
logger.Warn("Opening chain db", "path", cfg.Dirs.Chaindata)
limiter := semaphore.NewWeighted(int64(cfg.DBReadConcurrency))
limiter := semaphore.NewWeighted(roTxLimit)
rwKv, err = kv2.NewMDBX(logger).RoTxsLimiter(limiter).Path(cfg.Dirs.Chaindata).Accede().Open(ctx)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err
Expand Down Expand Up @@ -504,12 +505,12 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
if cc != nil && cc.Bor != nil {
if polygonSync {
stateReceiverContractAddress := cc.Bor.GetStateReceiverContract()
bridgeReader, err = bridge.AssembleReader(ctx, cfg.DataDir, logger, stateReceiverContractAddress)
bridgeReader, err = bridge.AssembleReader(ctx, cfg.DataDir, logger, stateReceiverContractAddress, roTxLimit)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err
}

heimdallReader, err = heimdall.AssembleReader(ctx, cc.Bor.CalculateSprintNumber, cfg.DataDir, cfg.Dirs.Tmp, logger)
heimdallReader, err = heimdall.AssembleReader(ctx, cc.Bor.CalculateSprintNumber, cfg.DataDir, cfg.Dirs.Tmp, logger, roTxLimit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to combine cfg.DataDir, cfg.Dirs.Tmp, logger and roTxLimit to a DBConfig struct to avoid adding yet another parameter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I agree with Elton, I actually raised the same thing in a previous PR - #11924 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I even suggest we have separate config structs per component like

  • heimdall.ReaderConfig and bridge.ReaderConfig
  • heimdall.ServiceConfig and bridge.ServiceConfig
  • etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to keep this productive though, maybe this PR can be merged and these improvements can be done in a follow up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'm approving and leaving it up to Shoham.

if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,10 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

if config.PolygonSync {
borConfig := consensusConfig.(*borcfg.BorConfig)
polygonBridge = bridge.Assemble(config.Dirs.DataDir, logger, borConfig, heimdallClient)
heimdallService = heimdall.AssembleService(borConfig.CalculateSprintNumber, config.HeimdallURL, dirs.DataDir, tmpdir, logger)
roTxLimit := int64(stack.Config().Http.DBReadConcurrency)

polygonBridge = bridge.Assemble(config.Dirs.DataDir, logger, borConfig, heimdallClient, roTxLimit)
heimdallService = heimdall.AssembleService(borConfig.CalculateSprintNumber, config.HeimdallURL, dirs.DataDir, tmpdir, logger, roTxLimit)
bridgeRPC = bridge.NewBackendServer(ctx, polygonBridge)

backend.polygonBridge = polygonBridge
Expand Down
4 changes: 2 additions & 2 deletions polygon/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type eventFetcher interface {
FetchStateSyncEvents(ctx context.Context, fromId uint64, to time.Time, limit int) ([]*heimdall.EventRecordWithTime, error)
}

func Assemble(dataDir string, logger log.Logger, borConfig *borcfg.BorConfig, eventFetcher eventFetcher) *Bridge {
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger, false /* accede */)
func Assemble(dataDir string, logger log.Logger, borConfig *borcfg.BorConfig, eventFetcher eventFetcher, roTxLimit int64) *Bridge {
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger, false /* accede */, roTxLimit)
bridgeStore := NewStore(bridgeDB)
reader := NewReader(bridgeStore, logger, borConfig.StateReceiverContract)
return NewBridge(bridgeStore, logger, borConfig, eventFetcher, reader)
Expand Down
2 changes: 1 addition & 1 deletion polygon/bridge/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func setup(t *testing.T, borConfig borcfg.BorConfig) (*heimdall.MockHeimdallClie
ctrl := gomock.NewController(t)
logger := testlog.Logger(t, log.LvlDebug)
heimdallClient := heimdall.NewMockHeimdallClient(ctrl)
b := Assemble(t.TempDir(), logger, &borConfig, heimdallClient)
b := Assemble(t.TempDir(), logger, &borConfig, heimdallClient, 1)
t.Cleanup(b.Close)
return heimdallClient, b
}
Expand Down
4 changes: 2 additions & 2 deletions polygon/bridge/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type Reader struct {
stateClientAddress libcommon.Address
}

func AssembleReader(ctx context.Context, dataDir string, logger log.Logger, stateReceiverContractAddress string) (*Reader, error) {
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger, true /* accede */)
func AssembleReader(ctx context.Context, dataDir string, logger log.Logger, stateReceiverContractAddress string, roTxLimit int64) (*Reader, error) {
bridgeDB := polygoncommon.NewDatabase(dataDir, kv.PolygonBridgeDB, databaseTablesCfg, logger, true /* accede */, roTxLimit)
bridgeStore := NewStore(bridgeDB)

err := bridgeStore.Prepare(ctx)
Expand Down
4 changes: 2 additions & 2 deletions polygon/heimdall/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type Reader struct {
}

// AssembleReader creates and opens the MDBX store. For use cases where the store is only being read from. Must call Close.
func AssembleReader(ctx context.Context, calculateSprintNumber CalculateSprintNumberFunc, dataDir string, tmpDir string, logger log.Logger) (*Reader, error) {
store := NewMdbxServiceStore(logger, dataDir, tmpDir)
func AssembleReader(ctx context.Context, calculateSprintNumber CalculateSprintNumberFunc, dataDir string, tmpDir string, logger log.Logger, roTxLimit int64) (*Reader, error) {
store := NewMdbxServiceStore(logger, dataDir, tmpDir, roTxLimit)

err := store.Prepare(ctx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions polygon/heimdall/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type service struct {
spanBlockProducersTracker *spanBlockProducersTracker
}

func AssembleService(calculateSprintNumberFn CalculateSprintNumberFunc, heimdallUrl string, dataDir string, tmpDir string, logger log.Logger) Service {
store := NewMdbxServiceStore(logger, dataDir, tmpDir)
func AssembleService(calculateSprintNumberFn CalculateSprintNumberFunc, heimdallUrl string, dataDir string, tmpDir string, logger log.Logger, roTxLimit int64) Service {
store := NewMdbxServiceStore(logger, dataDir, tmpDir, roTxLimit)
client := NewHeimdallClient(heimdallUrl, logger)
reader := NewReader(calculateSprintNumberFn, store, logger)
return NewService(calculateSprintNumberFn, client, store, logger, reader)
Expand Down
4 changes: 2 additions & 2 deletions polygon/heimdall/service_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type ServiceStore interface {
Close()
}

func NewMdbxServiceStore(logger log.Logger, dataDir string, tmpDir string) *MdbxServiceStore {
db := polygoncommon.NewDatabase(dataDir, kv.HeimdallDB, databaseTablesCfg, logger, false /* accede */)
func NewMdbxServiceStore(logger log.Logger, dataDir string, tmpDir string, roTxLimit int64) *MdbxServiceStore {
db := polygoncommon.NewDatabase(dataDir, kv.HeimdallDB, databaseTablesCfg, logger, false /* accede */, roTxLimit)
blockNumToIdIndexFactory := func(ctx context.Context) (*RangeIndex, error) {
return NewRangeIndex(ctx, tmpDir, logger)
}
Expand Down
2 changes: 1 addition & 1 deletion polygon/heimdall/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (suite *ServiceTestSuite) SetupSuite() {
tempDir := suite.T().TempDir()
dataDir := fmt.Sprintf("%s/datadir", tempDir)
logger := testlog.Logger(suite.T(), log.LvlCrit)
store := NewMdbxServiceStore(logger, dataDir, tempDir)
store := NewMdbxServiceStore(logger, dataDir, tempDir, 1)
borConfig := params.AmoyChainConfig.Bor.(*borcfg.BorConfig)
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.client = NewMockHeimdallClient(ctrl)
Expand Down
32 changes: 18 additions & 14 deletions polygon/polygoncommon/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,32 @@ import (
"sync"

"github.com/c2h5oh/datasize"
"golang.org/x/sync/semaphore"

"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/mdbx"
"github.com/erigontech/erigon-lib/log/v3"
)

type Database struct {
db kv.RwDB
dataDir string
label kv.Label
tableCfg kv.TableCfg
openOnce sync.Once
logger log.Logger
accede bool
db kv.RwDB
dataDir string
label kv.Label
tableCfg kv.TableCfg
openOnce sync.Once
logger log.Logger
accede bool
roTxLimit int64
}

func NewDatabase(dataDir string, label kv.Label, tableCfg kv.TableCfg, logger log.Logger, accede bool) *Database {
func NewDatabase(dataDir string, label kv.Label, tableCfg kv.TableCfg, logger log.Logger, accede bool, roTxLimit int64) *Database {
return &Database{
dataDir: dataDir,
label: label,
tableCfg: tableCfg,
logger: logger,
accede: accede,
dataDir: dataDir,
label: label,
tableCfg: tableCfg,
logger: logger,
accede: accede,
roTxLimit: roTxLimit,
}
}

Expand All @@ -58,7 +61,8 @@ func (db *Database) open(ctx context.Context) error {
Path(dbPath).
WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return db.tableCfg }).
MapSize(16 * datasize.GB).
GrowthStep(16 * datasize.MB)
GrowthStep(16 * datasize.MB).
RoTxsLimiter(semaphore.NewWeighted(db.roTxLimit))

if db.accede {
opts = opts.Accede()
Expand Down
Loading