Skip to content

Commit

Permalink
Remove --parallel-job-size config parameter used for reingestion.
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla committed Oct 3, 2024
1 parent ba11f09 commit be2f5b4
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 115 deletions.
44 changes: 44 additions & 0 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,50 @@ func TestNewLedgerBuffer(t *testing.T) {
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestNewLedgerBufferSizeLessThanRangeSize(t *testing.T) {
startLedger := uint32(10)
endLedger := uint32(30)
bsb := createBufferedStorageBackendForTesting()
bsb.config.NumWorkers = 2
bsb.config.BufferSize = 10
ledgerRange := BoundedRange(startLedger, endLedger)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount)
bsb.dataStore = mockDataStore

ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange)
assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 10 }, time.Second*1, time.Millisecond*50)
assert.NoError(t, err)

for i := startLedger; i < endLedger; i++ {
lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background())
assert.NoError(t, err)
assert.Equal(t, xdr.Uint32(i), lcm.StartSequence)
}
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestNewLedgerBufferSizeLargerThanRangeSize(t *testing.T) {
startLedger := uint32(1)
endLedger := uint32(15)
bsb := createBufferedStorageBackendForTesting()
bsb.config.NumWorkers = 2
bsb.config.BufferSize = 100
ledgerRange := BoundedRange(startLedger, endLedger)
mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount)
bsb.dataStore = mockDataStore

ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange)
assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 15 }, time.Second*1, time.Millisecond*50)
assert.NoError(t, err)

for i := uint32(0); i < endLedger; i++ {
lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background())
assert.NoError(t, err)
assert.Equal(t, xdr.Uint32(startLedger+i), lcm.StartSequence)
}
assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange)
}

func TestBSBGetLatestLedgerSequence(t *testing.T) {
startLedger := uint32(3)
endLedger := uint32(5)
Expand Down
6 changes: 6 additions & 0 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/stellar/go/support/collections/heap"
"github.com/stellar/go/support/compressxdr"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/ordered"

"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -54,6 +56,10 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu
less := func(a, b ledgerBatchObject) bool {
return a.startLedger < b.startLedger
}
// ensure BufferSize does not exceed the total range
if ledgerRange.bounded {
bsb.config.BufferSize = uint32(ordered.Min(int(bsb.config.BufferSize), int(ledgerRange.to-ledgerRange.from)+1))
}
pq := heap.New(less, int(bsb.config.BufferSize))

ledgerBuffer := &ledgerBuffer{
Expand Down
22 changes: 1 addition & 21 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ var (
dbDetectGapsCmd *cobra.Command
reingestForce bool
parallelWorkers uint
parallelJobSize uint32
retries uint
retryBackoffSeconds uint
ledgerBackendStr string
Expand Down Expand Up @@ -118,14 +117,6 @@ func ingestRangeCmdOpts() support.ConfigOptions {
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
Expand Down Expand Up @@ -186,9 +177,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
}

maxLedgersPerFlush := ingest.MaxLedgersPerFlush
if parallelJobSize < maxLedgersPerFlush {
maxLedgersPerFlush = parallelJobSize
}

ingestConfig := ingest.Config{
NetworkPassphrase: config.NetworkPassphrase,
Expand Down Expand Up @@ -219,10 +207,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
return systemErr
}

return system.ReingestRange(
ledgerRanges,
parallelJobSize,
)
return system.ReingestRange(ledgerRanges)
}

system, systemErr := ingest.NewSystem(ingestConfig)
Expand Down Expand Up @@ -486,11 +471,6 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor
if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil {
return err
}
// when using buffered storage, performance observations have noted optimal parallel batch size
// of 100, apply that as default if the flag was absent.
if !viper.IsSet("parallel-job-size") {
parallelJobSize = 100
}
options.NoCaptiveCore = true
}

Expand Down
62 changes: 0 additions & 62 deletions services/horizon/cmd/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,68 +45,6 @@ func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) {
s.rootCmd = NewRootCmd()
}

func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--parallel-workers", "2",
"--ledgerbackend", "datastore",
"--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(100))
}

func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--stellar-core-binary-path", "/test/core/bin/path",
"--parallel-workers", "2",
"--ledgerbackend", "captive-core",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(100_000))
}

func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--stellar-core-binary-path", "/test/core/bin/path",
"--parallel-workers", "2",
"--parallel-job-size", "5",
"--ledgerbackend", "captive-core",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(5))
}

func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() {
s.rootCmd.SetArgs([]string{
"db", "reingest", "range",
"--db-url", s.db.DSN,
"--network", "testnet",
"--parallel-workers", "2",
"--parallel-job-size", "5",
"--ledgerbackend", "datastore",
"--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml",
"2",
"10"})

require.NoError(s.T(), s.rootCmd.Execute())
require.Equal(s.T(), parallelJobSize, uint32(5))
}

func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() {
tests := []struct {
name string
Expand Down
14 changes: 6 additions & 8 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,10 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32,
return lowestLedger
}

func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 {
batchSize := batchSizeSuggestion
if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) {
// let's try to make use of all the workers
batchSize = rangeSize / uint32(workerCount)
}
func calculateParallelLedgerBatchSize(rangeSize uint32, workerCount uint) uint32 {
// let's try to make use of all the workers
batchSize := rangeSize / uint32(workerCount)

// Use a minimum batch size to make it worth it in terms of overhead
if batchSize < minBatchSize {
batchSize = minBatchSize
Expand All @@ -136,9 +134,9 @@ func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 {
return sum
}

func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error {
func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error {
var (
batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), batchSizeSuggestion, ps.workerCount)
batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), ps.workerCount)
reingestJobQueue = make(chan history.LedgerRange)
wg sync.WaitGroup

Expand Down
44 changes: 20 additions & 24 deletions services/horizon/internal/ingest/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
)

func TestCalculateParallelLedgerBatchSize(t *testing.T) {
assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 20096, 3))
assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 20096, 4))
assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 0, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(20096, 64, 1))
assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 3))
assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4))
assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 4))
assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 4))
assert.Equal(t, uint32(20096), calculateParallelLedgerBatchSize(20096, 1))
}

func TestParallelReingestRange(t *testing.T) {
Expand All @@ -45,29 +45,25 @@ func TestParallelReingestRange(t *testing.T) {
}
system, err := newParallelSystems(config, 3, factory)
assert.NoError(t, err)
err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258)
err = system.ReingestRange([]history.LedgerRange{{1, 2050}})
assert.NoError(t, err)

sort.Slice(rangesCalled, func(i, j int) bool {
return rangesCalled[i].StartSequence < rangesCalled[j].StartSequence
})
expected := []history.LedgerRange{
{StartSequence: 1, EndSequence: 256}, {StartSequence: 257, EndSequence: 512}, {StartSequence: 513, EndSequence: 768}, {StartSequence: 769, EndSequence: 1024}, {StartSequence: 1025, EndSequence: 1280},
{StartSequence: 1281, EndSequence: 1536}, {StartSequence: 1537, EndSequence: 1792}, {StartSequence: 1793, EndSequence: 2048}, {StartSequence: 2049, EndSequence: 2050},
{StartSequence: 1, EndSequence: 640}, {StartSequence: 641, EndSequence: 1280}, {StartSequence: 1281, EndSequence: 1920}, {StartSequence: 1921, EndSequence: 2050},
}
assert.Equal(t, expected, rangesCalled)

rangesCalled = nil
system, err = newParallelSystems(config, 1, factory)
assert.NoError(t, err)
result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1024)).Return(nil).Once()
err = system.ReingestRange([]history.LedgerRange{{1, 1024}}, 64)
err = system.ReingestRange([]history.LedgerRange{{1, 1024}})
result.AssertExpectations(t)
expected = []history.LedgerRange{
{StartSequence: 1, EndSequence: 64}, {StartSequence: 65, EndSequence: 128}, {StartSequence: 129, EndSequence: 192}, {StartSequence: 193, EndSequence: 256}, {StartSequence: 257, EndSequence: 320},
{StartSequence: 321, EndSequence: 384}, {StartSequence: 385, EndSequence: 448}, {StartSequence: 449, EndSequence: 512}, {StartSequence: 513, EndSequence: 576}, {StartSequence: 577, EndSequence: 640},
{StartSequence: 641, EndSequence: 704}, {StartSequence: 705, EndSequence: 768}, {StartSequence: 769, EndSequence: 832}, {StartSequence: 833, EndSequence: 896}, {StartSequence: 897, EndSequence: 960},
{StartSequence: 961, EndSequence: 1024},
{StartSequence: 1, EndSequence: 1024},
}
assert.NoError(t, err)
assert.Equal(t, expected, rangesCalled)
Expand All @@ -77,19 +73,19 @@ func TestParallelReingestRangeError(t *testing.T) {
config := Config{}
result := &mockSystem{}
// Fail on the second range
result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Return(errors.New("failed because of foo")).Once()
result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Return(errors.New("failed because of foo")).Once()

Check failure on line 76 in services/horizon/internal/ingest/parallel_test.go

View workflow job for this annotation

GitHub Actions / golangci

result.On undefined (type *mockSystem has no field or method On) (typecheck)
result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(nil)
result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1537)).Return(nil).Once()
result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once()

Check failure on line 78 in services/horizon/internal/ingest/parallel_test.go

View workflow job for this annotation

GitHub Actions / golangci

result.On undefined (type *mockSystem has no field or method On) (typecheck)

factory := func(c Config) (System, error) {
return result, nil
}
system, err := newParallelSystems(config, 3, factory)
assert.NoError(t, err)
err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258)
err = system.ReingestRange([]history.LedgerRange{{1, 2050}})
result.AssertExpectations(t)
assert.Error(t, err)
assert.Equal(t, "job failed, recommended restart range: [1537, 2050]: error when processing [1537, 1792] range: failed because of foo", err.Error())
assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error())
}

func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) {
Expand All @@ -98,27 +94,27 @@ func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) {
wg.Add(1)
result := &mockSystem{}
// Fail on an lower subrange after the first error
result.On("ReingestRange", []history.LedgerRange{{1025, 1280}}, false, false).Run(func(mock.Arguments) {
result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Run(func(mock.Arguments) {

Check failure on line 97 in services/horizon/internal/ingest/parallel_test.go

View workflow job for this annotation

GitHub Actions / golangci

result.On undefined (type *mockSystem has no field or method On) (typecheck)
// Wait for a more recent range to error
wg.Wait()
// This sleep should help making sure the result of this range is processed later than the one below
// (there are no guarantees without instrumenting ReingestRange(), but that's too complicated)
time.Sleep(50 * time.Millisecond)
}).Return(errors.New("failed because of foo")).Once()
result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Run(func(mock.Arguments) {
result.On("ReingestRange", []history.LedgerRange{{1281, 1920}}, false, false).Run(func(mock.Arguments) {
wg.Done()
}).Return(errors.New("failed because of bar")).Once()
result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(error(nil))
result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1025)).Return(nil).Once()
result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once()

factory := func(c Config) (System, error) {
return result, nil
}
system, err := newParallelSystems(config, 3, factory)
assert.NoError(t, err)
err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258)
err = system.ReingestRange([]history.LedgerRange{{1, 2050}})
result.AssertExpectations(t)
assert.Error(t, err)
assert.Equal(t, "job failed, recommended restart range: [1025, 2050]: error when processing [1025, 1280] range: failed because of foo", err.Error())
assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error())

}

0 comments on commit be2f5b4

Please sign in to comment.