diff --git a/pkg/checksum/checker_test.go b/pkg/checksum/checker_test.go index 6853433..fb3eda7 100644 --- a/pkg/checksum/checker_test.go +++ b/pkg/checksum/checker_test.go @@ -3,6 +3,7 @@ package checksum import ( "context" "testing" + "time" "github.com/cashapp/spirit/pkg/testutils" @@ -36,9 +37,9 @@ func TestBasicChecksum(t *testing.T) { cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) assert.NoError(t, feed.Run()) @@ -71,9 +72,9 @@ func TestBasicValidation(t *testing.T) { cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) assert.NoError(t, feed.Run()) @@ -108,9 +109,9 @@ func TestFixCorrupt(t *testing.T) { cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) assert.NoError(t, feed.Run()) @@ -151,9 +152,9 @@ func TestCorruptChecksum(t *testing.T) { cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) assert.NoError(t, feed.Run()) @@ -183,9 +184,9 @@ func TestBoundaryCases(t *testing.T) { cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) assert.NoError(t, feed.Run()) @@ -247,9 +248,9 @@ func TestChangeDataTypeDatetime(t *testing.T) { cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) assert.NoError(t, feed.Run()) diff --git a/pkg/migration/cutover_test.go b/pkg/migration/cutover_test.go index a216958..8e9e3c5 100644 --- a/pkg/migration/cutover_test.go +++ b/pkg/migration/cutover_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "testing" + "time" "github.com/cashapp/spirit/pkg/dbconn" "github.com/cashapp/spirit/pkg/repl" @@ -44,9 +45,9 @@ func TestCutOver(t *testing.T) { cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) feed := repl.NewClient(db, cfg.Addr, t1, t1new, cfg.User, cfg.Passwd, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) // the feed must be started. assert.NoError(t, feed.Run()) @@ -103,9 +104,9 @@ func TestMDLLockFails(t *testing.T) { cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) feed := repl.NewClient(db, cfg.Addr, t1, t1new, cfg.User, cfg.Passwd, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) // the feed must be started. assert.NoError(t, feed.Run()) @@ -141,9 +142,9 @@ func TestInvalidOptions(t *testing.T) { cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) feed := repl.NewClient(db, cfg.Addr, t1, t1new, cfg.User, cfg.Passwd, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) _, err = NewCutOver(db, nil, t1new, feed, dbconn.NewDBConfig(), logger) assert.Error(t, err) diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 3e7784b..eb85850 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -457,9 +457,9 @@ func (r *Runner) setup(ctx context.Context) error { return err } r.replClient = repl.NewClient(r.db, r.migration.Host, r.table, r.newTable, r.migration.Username, r.migration.Password, &repl.ClientConfig{ - Logger: r.logger, - Concurrency: r.migration.Threads, - BatchSize: repl.DefaultBatchSize, + Logger: r.logger, + Concurrency: r.migration.Threads, + TargetBatchTime: r.migration.TargetChunkTime, }) // Start the binary log feed now if err := r.replClient.Run(); err != nil { @@ -716,9 +716,9 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { // Set the binlog position. // Create a binlog subscriber r.replClient = repl.NewClient(r.db, r.migration.Host, r.table, r.newTable, r.migration.Username, r.migration.Password, &repl.ClientConfig{ - Logger: r.logger, - Concurrency: r.migration.Threads, - BatchSize: repl.DefaultBatchSize, + Logger: r.logger, + Concurrency: r.migration.Threads, + TargetBatchTime: r.migration.TargetChunkTime, }) r.replClient.SetPos(mysql.Position{ Name: binlogName, diff --git a/pkg/migration/runner_test.go b/pkg/migration/runner_test.go index 8a243ee..6ab70fc 100644 --- a/pkg/migration/runner_test.go +++ b/pkg/migration/runner_test.go @@ -761,9 +761,9 @@ func TestCheckpoint(t *testing.T) { assert.NoError(t, r.alterNewTable(context.TODO())) assert.NoError(t, r.createCheckpointTable(context.TODO())) r.replClient = repl.NewClient(r.db, r.migration.Host, r.table, r.newTable, r.migration.Username, r.migration.Password, &repl.ClientConfig{ - Logger: logrus.New(), // don't use the logger for migration since we feed status to it. - Concurrency: 4, - BatchSize: 10000, + Logger: logrus.New(), // don't use the logger for migration since we feed status to it. + Concurrency: 4, + TargetBatchTime: r.migration.TargetChunkTime, }) r.copier, err = row.NewCopier(r.db, r.table, r.newTable, row.NewCopierDefaultConfig()) assert.NoError(t, err) @@ -909,9 +909,9 @@ func TestCheckpointRestore(t *testing.T) { assert.NoError(t, r.createCheckpointTable(context.TODO())) r.replClient = repl.NewClient(r.db, r.migration.Host, r.table, r.newTable, r.migration.Username, r.migration.Password, &repl.ClientConfig{ - Logger: logrus.New(), - Concurrency: 4, - BatchSize: 10000, + Logger: logrus.New(), + Concurrency: 4, + TargetBatchTime: r.migration.TargetChunkTime, }) r.copier, err = row.NewCopier(r.db, r.table, r.newTable, row.NewCopierDefaultConfig()) assert.NoError(t, err) @@ -995,9 +995,9 @@ func TestCheckpointDifferentRestoreOptions(t *testing.T) { assert.NoError(t, m.createCheckpointTable(context.TODO())) logger := logrus.New() m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: m.migration.TargetChunkTime, }) m.copier, err = row.NewCopier(m.db, m.table, m.newTable, row.NewCopierDefaultConfig()) assert.NoError(t, err) @@ -1190,9 +1190,9 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) { assert.NoError(t, m.createCheckpointTable(context.TODO())) logger := logrus.New() m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: m.migration.TargetChunkTime, }) m.copier, err = row.NewCopier(m.db, m.table, m.newTable, &row.CopierConfig{ Concurrency: m.migration.Threads, @@ -1314,9 +1314,9 @@ func TestE2EBinlogSubscribingNonCompositeKey(t *testing.T) { assert.NoError(t, m.createCheckpointTable(context.TODO())) logger := logrus.New() m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: m.migration.TargetChunkTime, }) m.copier, err = row.NewCopier(m.db, m.table, m.newTable, &row.CopierConfig{ Concurrency: m.migration.Threads, @@ -1915,9 +1915,9 @@ func TestE2ERogueValues(t *testing.T) { assert.NoError(t, m.createCheckpointTable(context.TODO())) logger := logrus.New() m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: repl.DefaultBatchSize, + Logger: logger, + Concurrency: 4, + TargetBatchTime: m.migration.TargetChunkTime, }) m.copier, err = row.NewCopier(m.db, m.table, m.newTable, &row.CopierConfig{ Concurrency: m.migration.Threads, @@ -2077,9 +2077,9 @@ func TestResumeFromCheckpointPhantom(t *testing.T) { assert.NoError(t, m.createCheckpointTable(ctx)) logger := logrus.New() m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: repl.DefaultBatchSize, + Logger: logger, + Concurrency: 4, + TargetBatchTime: m.migration.TargetChunkTime, }) m.copier, err = row.NewCopier(m.db, m.table, m.newTable, &row.CopierConfig{ Concurrency: m.migration.Threads, diff --git a/pkg/repl/client.go b/pkg/repl/client.go index 2a00f7a..5701e01 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -31,7 +31,12 @@ const ( // Since on some of our Aurora tables with out-of-cache workloads only copy ~300 rows per second, // we probably shouldn't set this any larger than about 1K. It will also use // multiple-flush-threads, which should help it group commit and still be fast. + // This is only used as an initial starting value. It will auto-scale based on the DefaultTargetBatchTime. DefaultBatchSize = 1000 + + // DefaultTargetBatchTime is the target time for flushing REPLACE/DELETE statements. + DefaultTargetBatchTime = time.Millisecond * 500 + // DefaultFlushInterval is the time that the client will flush all binlog changes to disk. // Longer values require more memory, but permit more merging. // I expect we will change this to 1hr-24hr in the future. @@ -42,6 +47,22 @@ type queuedChange struct { key string isDelete bool } + +type statement struct { + numKeys int + stmt string +} + +func extractStmt(stmts []statement) []string { + var trimmed []string + for _, stmt := range stmts { + if stmt.stmt != "" { + trimmed = append(trimmed, stmt.stmt) + } + } + return trimmed +} + type Client struct { canal.DummyEventHandler sync.Mutex @@ -76,8 +97,11 @@ type Client struct { isClosed bool - batchSize int64 - concurrency int + statisticsLock sync.Mutex + targetBatchTime time.Duration + targetBatchSize int64 // will auto-adjust over time, use atomic to read/set + timingHistory []time.Duration + concurrency int // The periodic flush lock is just used for ensuring only one periodic flush runs at a time, // and when we disable it, no more periodic flushes will run. The actual flushing is protected @@ -98,23 +122,24 @@ func NewClient(db *sql.DB, host string, table, newTable *table.TableInfo, userna password: password, binlogChangeset: make(map[string]bool), logger: config.Logger, - batchSize: config.BatchSize, + targetBatchTime: config.TargetBatchTime, + targetBatchSize: DefaultBatchSize, // initial starting value. concurrency: config.Concurrency, } } type ClientConfig struct { - BatchSize int64 - Concurrency int - Logger loggers.Advanced + TargetBatchTime time.Duration + Concurrency int + Logger loggers.Advanced } // NewClientDefaultConfig returns a default config for the copier. func NewClientDefaultConfig() *ClientConfig { return &ClientConfig{ - Concurrency: 4, - BatchSize: DefaultBatchSize, - Logger: logrus.New(), + Concurrency: 4, + TargetBatchTime: DefaultTargetBatchTime, + Logger: logrus.New(), } } @@ -405,13 +430,14 @@ func (c *Client) flushQueue(ctx context.Context, underLock bool, lock *dbconn.Ta } // Otherwise, flush the changes. - var stmts []string + var stmts []statement var buffer []string prevKey := changesToFlush[0] // for initialization + target := int(atomic.LoadInt64(&c.targetBatchSize)) for _, change := range changesToFlush { // We are changing from DELETE to REPLACE // or vice versa, *or* the buffer is getting very large. - if change.isDelete != prevKey.isDelete || len(buffer) > DefaultBatchSize { + if change.isDelete != prevKey.isDelete || len(buffer) > target { if prevKey.isDelete { stmts = append(stmts, c.createDeleteStmt(buffer)) } else { @@ -432,13 +458,13 @@ func (c *Client) flushQueue(ctx context.Context, underLock bool, lock *dbconn.Ta // Execute under lock means it is a final flush // We need to use the lock connection to do this // so there is no parallelism. - if err := lock.ExecUnderLock(ctx, stmts); err != nil { + if err := lock.ExecUnderLock(ctx, extractStmt(stmts)); err != nil { return err } } else { // Execute the statements in a transaction. // They still need to be single threaded. - if _, err := dbconn.RetryableTransaction(ctx, c.db, true, dbconn.NewDBConfig(), stmts...); err != nil { + if _, err := dbconn.RetryableTransaction(ctx, c.db, true, dbconn.NewDBConfig(), extractStmt(stmts)...); err != nil { return err } } @@ -468,8 +494,9 @@ func (c *Client) flushMap(ctx context.Context, underLock bool, lock *dbconn.Tabl // We must now apply the changeset setToFlush to the new table. var deleteKeys []string var replaceKeys []string - var stmts []string + var stmts []statement var i int64 + target := atomic.LoadInt64(&c.targetBatchSize) for key, isDelete := range setToFlush { i++ if isDelete { @@ -477,12 +504,12 @@ func (c *Client) flushMap(ctx context.Context, underLock bool, lock *dbconn.Tabl } else { replaceKeys = append(replaceKeys, key) } - if (i % c.batchSize) == 0 { + if (i % target) == 0 { stmts = append(stmts, c.createDeleteStmt(deleteKeys)) stmts = append(stmts, c.createReplaceStmt(replaceKeys)) deleteKeys = []string{} replaceKeys = []string{} - atomic.AddInt64(&c.binlogChangesetDelta, -c.batchSize) + atomic.AddInt64(&c.binlogChangesetDelta, -target) } } stmts = append(stmts, c.createDeleteStmt(deleteKeys)) @@ -492,7 +519,7 @@ func (c *Client) flushMap(ctx context.Context, underLock bool, lock *dbconn.Tabl // Execute under lock means it is a final flush // We need to use the lock connection to do this // so there is no parallelism. - if err := lock.ExecUnderLock(ctx, stmts); err != nil { + if err := lock.ExecUnderLock(ctx, extractStmt(stmts)); err != nil { return err } } else { @@ -505,7 +532,9 @@ func (c *Client) flushMap(ctx context.Context, underLock bool, lock *dbconn.Tabl for _, stmt := range stmts { s := stmt g.Go(func() error { - _, err := dbconn.RetryableTransaction(errGrpCtx, c.db, false, dbconn.NewDBConfig(), s) + startTime := time.Now() + _, err := dbconn.RetryableTransaction(errGrpCtx, c.db, false, dbconn.NewDBConfig(), s.stmt) + c.feedback(s.numKeys, time.Since(startTime)) return err }) } @@ -520,7 +549,7 @@ func (c *Client) flushMap(ctx context.Context, underLock bool, lock *dbconn.Tabl return nil } -func (c *Client) createDeleteStmt(deleteKeys []string) string { +func (c *Client) createDeleteStmt(deleteKeys []string) statement { var deleteStmt string if len(deleteKeys) > 0 { deleteStmt = fmt.Sprintf("DELETE FROM %s WHERE (%s) IN (%s)", @@ -529,10 +558,13 @@ func (c *Client) createDeleteStmt(deleteKeys []string) string { c.pksToRowValueConstructor(deleteKeys), ) } - return deleteStmt + return statement{ + numKeys: len(deleteKeys), + stmt: deleteStmt, + } } -func (c *Client) createReplaceStmt(replaceKeys []string) string { +func (c *Client) createReplaceStmt(replaceKeys []string) statement { var replaceStmt string if len(replaceKeys) > 0 { replaceStmt = fmt.Sprintf("REPLACE INTO %s (%s) SELECT %s FROM %s FORCE INDEX (PRIMARY) WHERE (%s) IN (%s)", @@ -544,7 +576,46 @@ func (c *Client) createReplaceStmt(replaceKeys []string) string { c.pksToRowValueConstructor(replaceKeys), ) } - return replaceStmt + return statement{ + numKeys: len(replaceKeys), + stmt: replaceStmt, + } +} + +// feedback provides feedback on the apply time of changesets. +// We use this to refine the targetBatchSize. This is a little bit +// different for feedback for the copier, because frequently the batches +// will not be full. We still need to use a p90-like mechanism though, +// because the rows being changed are by definition more likely to be hotspots. +// Hotspots == Lock Contention. This is one of the exact reasons why we are +// chunking in the first place. The probability that the applier can cause +// impact on OLTP workloads is much higher than the copier. +func (c *Client) feedback(numberOfKeys int, d time.Duration) { + c.statisticsLock.Lock() + defer c.statisticsLock.Unlock() + if numberOfKeys == 0 { + // If the number of keys is zero, we can't + // calculate anything so we just return + return + } + // For the p90-like mechanism rather than storing all the previous + // durations, because the numberOfKeys is variable we instead store + // the timePerKey. We then adjust the targetBatchSize based on this. + // This creates some skew because small batches will have a higher + // timePerKey, which can create a back log. Which results in a smaller + // timePerKey. So at least the skew *should* be self-correcting. This + // has not yet been proven though. + timePerKey := d / time.Duration(numberOfKeys) + c.timingHistory = append(c.timingHistory, timePerKey) + + // If we have enough feedback re-evaluate the target batch size + // based on the p90 timePerKey. + if len(c.timingHistory) >= 10 { + timePerKey := table.LazyFindP90(c.timingHistory) + newBatchSize := int64(float64(c.targetBatchTime) / float64(timePerKey)) + atomic.StoreInt64(&c.targetBatchSize, newBatchSize) + c.timingHistory = nil // reset + } } // Flush empties the changeset in a loop until the amount of changes is considered "trivial". @@ -610,7 +681,10 @@ func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration) c.logger.Errorf("error flushing binary log: %v", err) } c.periodicFlushLock.Unlock() - c.logger.Infof("finished periodic flush of binary log: duration=%v", time.Since(startLoop)) + c.logger.Infof("finished periodic flush of binary log: total-duration=%v batch-size=%d", + time.Since(startLoop), + atomic.LoadInt64(&c.targetBatchSize), + ) } } } diff --git a/pkg/repl/client_test.go b/pkg/repl/client_test.go index a6e003b..a2c7d0a 100644 --- a/pkg/repl/client_test.go +++ b/pkg/repl/client_test.go @@ -36,9 +36,9 @@ func TestReplClient(t *testing.T) { cfg, err := mysql2.ParseDSN(testutils.DSN()) assert.NoError(t, err) client := NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) assert.NoError(t, client.Run()) defer client.Close() @@ -151,9 +151,9 @@ func TestReplClientResumeFromImpossible(t *testing.T) { cfg, err := mysql2.ParseDSN(testutils.DSN()) assert.NoError(t, err) client := NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) client.SetPos(mysql.Position{ Name: "impossible", @@ -180,9 +180,9 @@ func TestReplClientResumeFromPoint(t *testing.T) { cfg, err := mysql2.ParseDSN(testutils.DSN()) assert.NoError(t, err) client := NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) pos, err := client.getCurrentBinlogPosition() assert.NoError(t, err) @@ -215,9 +215,9 @@ func TestReplClientOpts(t *testing.T) { cfg, err := mysql2.ParseDSN(testutils.DSN()) assert.NoError(t, err) client := NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &ClientConfig{ - Logger: logger, - Concurrency: 4, - BatchSize: 10000, + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, }) assert.Equal(t, 0, db.Stats().InUse) // no connections in use. assert.NoError(t, client.Run()) @@ -313,3 +313,46 @@ func TestReplClientQueue(t *testing.T) { assert.NoError(t, client.Flush(context.TODO())) assert.Equal(t, client.GetDeltaLen(), 0) } + +func TestFeedback(t *testing.T) { + db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) + assert.NoError(t, err) + + testutils.RunSQL(t, "DROP TABLE IF EXISTS feedbackt1, feedbackt2, _feedbackt1_chkpnt") + testutils.RunSQL(t, "CREATE TABLE feedbackt1 (a VARCHAR(255) NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE feedbackt2 (a VARCHAR(255) NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _feedbackt1_chkpnt (a int)") // just used to advance binlog + + t1 := table.NewTableInfo(db, "test", "replqueuet1") + assert.NoError(t, t1.SetInfo(context.TODO())) + t2 := table.NewTableInfo(db, "test", "replqueuet2") + assert.NoError(t, t2.SetInfo(context.TODO())) + + cfg, err := mysql2.ParseDSN(testutils.DSN()) + assert.NoError(t, err) + + client := NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, NewClientDefaultConfig()) + assert.NoError(t, client.Run()) + defer client.Close() + + // initial values expected: + assert.Equal(t, time.Millisecond*500, client.targetBatchTime) + assert.Equal(t, int64(1000), client.targetBatchSize) + + // Make it complete 5 times faster than expected + // Run 9 times initially. + for i := 0; i < 9; i++ { + client.feedback(1000, time.Millisecond*100) + } + assert.Equal(t, int64(1000), client.targetBatchSize) // no change yet + client.feedback(0, time.Millisecond*100) // no keys, should not cause change. + assert.Equal(t, int64(1000), client.targetBatchSize) // no change yet + client.feedback(1000, time.Millisecond*100) // 10th time. + assert.Equal(t, int64(5000), client.targetBatchSize) // 5x more keys. + + // test with slower chunk + for i := 0; i < 10; i++ { + client.feedback(1000, time.Second) + } + assert.Equal(t, int64(500), client.targetBatchSize) // less keys. +} diff --git a/pkg/table/chunker_composite.go b/pkg/table/chunker_composite.go index 42815e1..40abe5d 100644 --- a/pkg/table/chunker_composite.go +++ b/pkg/table/chunker_composite.go @@ -386,7 +386,7 @@ func (t *chunkerComposite) boundaryCheckTargetChunkSize(newTarget uint64) uint64 func (t *chunkerComposite) calculateNewTargetChunkSize() uint64 { // We do all our math as float64 of time in ns - p90 := float64(lazyFindP90(t.chunkTimingInfo)) + p90 := float64(LazyFindP90(t.chunkTimingInfo)) targetTime := float64(t.ChunkerTarget) newTargetRows := float64(t.chunkSize) * (targetTime / p90) return uint64(newTargetRows) diff --git a/pkg/table/chunker_optimistic.go b/pkg/table/chunker_optimistic.go index 48c9dbc..81ff01d 100644 --- a/pkg/table/chunker_optimistic.go +++ b/pkg/table/chunker_optimistic.go @@ -390,7 +390,7 @@ func (t *chunkerOptimistic) boundaryCheckTargetChunkSize(newTarget uint64) uint6 func (t *chunkerOptimistic) calculateNewTargetChunkSize() uint64 { // We do all our math as float64 of time in ns - p90 := float64(lazyFindP90(t.chunkTimingInfo)) + p90 := float64(LazyFindP90(t.chunkTimingInfo)) targetTime := float64(t.ChunkerTarget) newTargetRows := float64(t.chunkSize) * (targetTime / p90) // switch to prefetch chunking if: diff --git a/pkg/table/utils.go b/pkg/table/utils.go index 5c6f7b4..9d5e09e 100644 --- a/pkg/table/utils.go +++ b/pkg/table/utils.go @@ -8,10 +8,10 @@ import ( "time" ) -// lazyFindP90 finds the second to last value in a slice. +// LazyFindP90 finds the second to last value in a slice. // This is the same as a p90 if there are 10 values, but if // there were 100 values it would technically be a p99 etc. -func lazyFindP90(a []time.Duration) time.Duration { +func LazyFindP90(a []time.Duration) time.Duration { sort.Slice(a, func(i, j int) bool { return a[i] > a[j] }) diff --git a/pkg/table/utils_test.go b/pkg/table/utils_test.go index eca5b80..06aa636 100644 --- a/pkg/table/utils_test.go +++ b/pkg/table/utils_test.go @@ -20,7 +20,7 @@ func TestFindP90(t *testing.T) { 1 * time.Second, 1 * time.Second, } - assert.Equal(t, 3*time.Second, lazyFindP90(times)) + assert.Equal(t, 3*time.Second, LazyFindP90(times)) } type castableTpTest struct {