From be2f5b4f96dfccfffc3b3ecfeb7ddedbb6d2656d Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 3 Oct 2024 00:50:06 -0700 Subject: [PATCH] Remove --parallel-job-size config parameter used for reingestion. --- .../buffered_storage_backend_test.go | 44 +++++++++++++ ingest/ledgerbackend/ledger_buffer.go | 6 ++ services/horizon/cmd/db.go | 22 +------ services/horizon/cmd/db_test.go | 62 ------------------- services/horizon/internal/ingest/parallel.go | 14 ++--- .../horizon/internal/ingest/parallel_test.go | 44 ++++++------- 6 files changed, 77 insertions(+), 115 deletions(-) diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index 0d461cff07..f183e927ef 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -160,6 +160,50 @@ func TestNewLedgerBuffer(t *testing.T) { assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) } +func TestNewLedgerBufferSizeLessThanRangeSize(t *testing.T) { + startLedger := uint32(10) + endLedger := uint32(30) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 10 + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange) + assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 10 }, time.Second*1, time.Millisecond*50) + assert.NoError(t, err) + + for i := startLedger; i < endLedger; i++ { + lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background()) + assert.NoError(t, err) + assert.Equal(t, xdr.Uint32(i), lcm.StartSequence) + } + assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) +} + +func TestNewLedgerBufferSizeLargerThanRangeSize(t *testing.T) { + startLedger := uint32(1) + endLedger := uint32(15) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 100 + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange) + assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 15 }, time.Second*1, time.Millisecond*50) + assert.NoError(t, err) + + for i := uint32(0); i < endLedger; i++ { + lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background()) + assert.NoError(t, err) + assert.Equal(t, xdr.Uint32(startLedger+i), lcm.StartSequence) + } + assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) +} + func TestBSBGetLatestLedgerSequence(t *testing.T) { startLedger := uint32(3) endLedger := uint32(5) diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go index d23bf0bfbd..7ee9dda083 100644 --- a/ingest/ledgerbackend/ledger_buffer.go +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -13,6 +13,8 @@ import ( "github.com/stellar/go/support/collections/heap" "github.com/stellar/go/support/compressxdr" "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/ordered" + "github.com/stellar/go/xdr" ) @@ -54,6 +56,10 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu less := func(a, b ledgerBatchObject) bool { return a.startLedger < b.startLedger } + // ensure BufferSize does not exceed the total range + if ledgerRange.bounded { + bsb.config.BufferSize = uint32(ordered.Min(int(bsb.config.BufferSize), int(ledgerRange.to-ledgerRange.from)+1)) + } pq := heap.New(less, int(bsb.config.BufferSize)) ledgerBuffer := &ledgerBuffer{ diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 92a732e002..f36c730880 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -42,7 +42,6 @@ var ( dbDetectGapsCmd *cobra.Command reingestForce bool parallelWorkers uint - parallelJobSize uint32 retries uint retryBackoffSeconds uint ledgerBackendStr string @@ -118,14 +117,6 @@ func ingestRangeCmdOpts() support.ConfigOptions { FlagDefault: uint(1), Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers", }, - { - Name: "parallel-job-size", - ConfigKey: ¶llelJobSize, - OptType: types.Uint32, - Required: false, - FlagDefault: uint32(100000), - Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", - }, { Name: "retries", ConfigKey: &retries, @@ -186,9 +177,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, } maxLedgersPerFlush := ingest.MaxLedgersPerFlush - if parallelJobSize < maxLedgersPerFlush { - maxLedgersPerFlush = parallelJobSize - } ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, @@ -219,10 +207,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, return systemErr } - return system.ReingestRange( - ledgerRanges, - parallelJobSize, - ) + return system.ReingestRange(ledgerRanges) } system, systemErr := ingest.NewSystem(ingestConfig) @@ -486,11 +471,6 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { return err } - // when using buffered storage, performance observations have noted optimal parallel batch size - // of 100, apply that as default if the flag was absent. - if !viper.IsSet("parallel-job-size") { - parallelJobSize = 100 - } options.NoCaptiveCore = true } diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go index 6a00576bd3..694535492c 100644 --- a/services/horizon/cmd/db_test.go +++ b/services/horizon/cmd/db_test.go @@ -45,68 +45,6 @@ func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) { s.rootCmd = NewRootCmd() } -func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--parallel-workers", "2", - "--ledgerbackend", "datastore", - "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(100)) -} - -func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--stellar-core-binary-path", "/test/core/bin/path", - "--parallel-workers", "2", - "--ledgerbackend", "captive-core", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(100_000)) -} - -func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--stellar-core-binary-path", "/test/core/bin/path", - "--parallel-workers", "2", - "--parallel-job-size", "5", - "--ledgerbackend", "captive-core", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(5)) -} - -func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--parallel-workers", "2", - "--parallel-job-size", "5", - "--ledgerbackend", "datastore", - "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(5)) -} - func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() { tests := []struct { name string diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index 4f07c21cc4..679224b176 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -113,12 +113,10 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, return lowestLedger } -func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { - batchSize := batchSizeSuggestion - if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) { - // let's try to make use of all the workers - batchSize = rangeSize / uint32(workerCount) - } +func calculateParallelLedgerBatchSize(rangeSize uint32, workerCount uint) uint32 { + // let's try to make use of all the workers + batchSize := rangeSize / uint32(workerCount) + // Use a minimum batch size to make it worth it in terms of overhead if batchSize < minBatchSize { batchSize = minBatchSize @@ -136,9 +134,9 @@ func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 { return sum } -func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error { +func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error { var ( - batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), batchSizeSuggestion, ps.workerCount) + batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), ps.workerCount) reingestJobQueue = make(chan history.LedgerRange) wg sync.WaitGroup diff --git a/services/horizon/internal/ingest/parallel_test.go b/services/horizon/internal/ingest/parallel_test.go index 8004a4048c..1a8e04980b 100644 --- a/services/horizon/internal/ingest/parallel_test.go +++ b/services/horizon/internal/ingest/parallel_test.go @@ -15,13 +15,13 @@ import ( ) func TestCalculateParallelLedgerBatchSize(t *testing.T) { - assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 20096, 3)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 20096, 4)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 0, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(20096, 64, 1)) + assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 3)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 4)) + assert.Equal(t, uint32(20096), calculateParallelLedgerBatchSize(20096, 1)) } func TestParallelReingestRange(t *testing.T) { @@ -45,15 +45,14 @@ func TestParallelReingestRange(t *testing.T) { } system, err := newParallelSystems(config, 3, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) assert.NoError(t, err) sort.Slice(rangesCalled, func(i, j int) bool { return rangesCalled[i].StartSequence < rangesCalled[j].StartSequence }) expected := []history.LedgerRange{ - {StartSequence: 1, EndSequence: 256}, {StartSequence: 257, EndSequence: 512}, {StartSequence: 513, EndSequence: 768}, {StartSequence: 769, EndSequence: 1024}, {StartSequence: 1025, EndSequence: 1280}, - {StartSequence: 1281, EndSequence: 1536}, {StartSequence: 1537, EndSequence: 1792}, {StartSequence: 1793, EndSequence: 2048}, {StartSequence: 2049, EndSequence: 2050}, + {StartSequence: 1, EndSequence: 640}, {StartSequence: 641, EndSequence: 1280}, {StartSequence: 1281, EndSequence: 1920}, {StartSequence: 1921, EndSequence: 2050}, } assert.Equal(t, expected, rangesCalled) @@ -61,13 +60,10 @@ func TestParallelReingestRange(t *testing.T) { system, err = newParallelSystems(config, 1, factory) assert.NoError(t, err) result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1024)).Return(nil).Once() - err = system.ReingestRange([]history.LedgerRange{{1, 1024}}, 64) + err = system.ReingestRange([]history.LedgerRange{{1, 1024}}) result.AssertExpectations(t) expected = []history.LedgerRange{ - {StartSequence: 1, EndSequence: 64}, {StartSequence: 65, EndSequence: 128}, {StartSequence: 129, EndSequence: 192}, {StartSequence: 193, EndSequence: 256}, {StartSequence: 257, EndSequence: 320}, - {StartSequence: 321, EndSequence: 384}, {StartSequence: 385, EndSequence: 448}, {StartSequence: 449, EndSequence: 512}, {StartSequence: 513, EndSequence: 576}, {StartSequence: 577, EndSequence: 640}, - {StartSequence: 641, EndSequence: 704}, {StartSequence: 705, EndSequence: 768}, {StartSequence: 769, EndSequence: 832}, {StartSequence: 833, EndSequence: 896}, {StartSequence: 897, EndSequence: 960}, - {StartSequence: 961, EndSequence: 1024}, + {StartSequence: 1, EndSequence: 1024}, } assert.NoError(t, err) assert.Equal(t, expected, rangesCalled) @@ -77,19 +73,19 @@ func TestParallelReingestRangeError(t *testing.T) { config := Config{} result := &mockSystem{} // Fail on the second range - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Return(errors.New("failed because of foo")).Once() + result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Return(errors.New("failed because of foo")).Once() result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(nil) - result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1537)).Return(nil).Once() + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil } system, err := newParallelSystems(config, 3, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) assert.Error(t, err) - assert.Equal(t, "job failed, recommended restart range: [1537, 2050]: error when processing [1537, 1792] range: failed because of foo", err.Error()) + assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error()) } func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { @@ -98,27 +94,27 @@ func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { wg.Add(1) result := &mockSystem{} // Fail on an lower subrange after the first error - result.On("ReingestRange", []history.LedgerRange{{1025, 1280}}, false, false).Run(func(mock.Arguments) { + result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Run(func(mock.Arguments) { // Wait for a more recent range to error wg.Wait() // This sleep should help making sure the result of this range is processed later than the one below // (there are no guarantees without instrumenting ReingestRange(), but that's too complicated) time.Sleep(50 * time.Millisecond) }).Return(errors.New("failed because of foo")).Once() - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Run(func(mock.Arguments) { + result.On("ReingestRange", []history.LedgerRange{{1281, 1920}}, false, false).Run(func(mock.Arguments) { wg.Done() }).Return(errors.New("failed because of bar")).Once() result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(error(nil)) - result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1025)).Return(nil).Once() + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil } system, err := newParallelSystems(config, 3, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) assert.Error(t, err) - assert.Equal(t, "job failed, recommended restart range: [1025, 2050]: error when processing [1025, 1280] range: failed because of foo", err.Error()) + assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error()) }