diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 1797ab27c2..faf1f7c411 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -576,6 +576,7 @@ func importData(importFileTasks []*ImportFileTask) { utils.PrintAndLog("Tables to import: %v", importFileTasksToTableNames(pendingTasks)) prepareTableToColumns(pendingTasks) //prepare the tableToColumns map poolSize := tconf.Parallelism * 2 + maxTasksInProgress := tconf.Parallelism if tconf.EnableYBAdaptiveParallelism { // in case of adaptive parallelism, we need to use maxParalllelism * 2 yb, ok := tdb.(*tgtdb.TargetYugabyteDB) @@ -593,7 +594,7 @@ func importData(importFileTasks []*ImportFileTask) { useTaskPicker := utils.GetEnvAsBool("USE_TASK_PICKER_FOR_IMPORT", true) if useTaskPicker { - err := importTasksViaTaskPicker(pendingTasks, state, progressReporter, poolSize) + err := importTasksViaTaskPicker(pendingTasks, state, progressReporter, poolSize, maxTasksInProgress) if err != nil { utils.ErrExit("Failed to import tasks via task picker: %s", err) } @@ -708,16 +709,30 @@ func importData(importFileTasks []*ImportFileTask) { - For the task that is picked, produce the next batch and submit it to the worker pool. Worker will asynchronously import the batch. - If task is done, mark it as done in the task picker. */ -func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataState, progressReporter *ImportDataProgressReporter, poolSize int) error { +func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataState, progressReporter *ImportDataProgressReporter, poolSize int, maxTasksInProgress int) error { // The code can produce `poolSize` number of batches at a time. But, it can consume only // `parallelism` number of batches at a time. batchImportPool = pool.New().WithMaxGoroutines(poolSize) log.Infof("created batch import pool of size: %d", poolSize) - taskPicker, err := NewSequentialTaskPicker(pendingTasks, state) - if err != nil { - return fmt.Errorf("create task picker: %w", err) + var taskPicker FileTaskPicker + var err error + if importerRole == TARGET_DB_IMPORTER_ROLE || importerRole == IMPORT_FILE_ROLE { + yb, ok := tdb.(*tgtdb.TargetYugabyteDB) + if !ok { + return fmt.Errorf("expected tdb to be of type TargetYugabyteDB, got: %T", tdb) + } + taskPicker, err = NewColocatedAwareRandomTaskPicker(maxTasksInProgress, pendingTasks, state, yb) + if err != nil { + return fmt.Errorf("create colocated aware randmo task picker: %w", err) + } + } else { + taskPicker, err = NewSequentialTaskPicker(pendingTasks, state) + if err != nil { + return fmt.Errorf("create sequential task picker: %w", err) + } } + taskImporters := map[int]*FileTaskImporter{} for taskPicker.HasMoreTasks() { @@ -725,6 +740,7 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS if err != nil { return fmt.Errorf("get next task: %w", err) } + log.Infof("Picked task for import: %s", task) var taskImporter *FileTaskImporter var ok bool taskImporter, ok = taskImporters[task.ID] @@ -741,6 +757,7 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS // All batches for this task have been submitted. // task could have been completed (all batches imported) OR still in progress // in case task is done, we should inform task picker so that we stop picking that task. + log.Infof("All batches submitted for task: %s", task) taskDone, err := taskImporter.AllBatchesImported() if err != nil { return fmt.Errorf("check if all batches are imported: task: %v err :%w", task, err) @@ -751,6 +768,7 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS if err != nil { return fmt.Errorf("mark task as done: task: %v, err: %w", task, err) } + log.Infof("Import of task done: %s", task) continue } else { // some batches are still in progress, wait for them to complete as decided by the picker. diff --git a/yb-voyager/cmd/importDataFileTaskPicker.go b/yb-voyager/cmd/importDataFileTaskPicker.go index f6a3e904eb..cac3b524eb 100644 --- a/yb-voyager/cmd/importDataFileTaskPicker.go +++ b/yb-voyager/cmd/importDataFileTaskPicker.go @@ -17,7 +17,21 @@ package cmd import ( "fmt" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/mroth/weightedrand/v2" + "github.com/samber/lo" + log "github.com/sirupsen/logrus" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" + "golang.org/x/exp/rand" +) + +const ( + SHARDED = "sharded" + COLOCATED = "colocated" ) type FileTaskPicker interface { @@ -106,3 +120,271 @@ func (s *SequentialTaskPicker) WaitForTasksBatchesTobeImported() { // Instead of having a busy-loop where we keep checking if the task is done, we can wait for a second and then check again. time.Sleep(time.Second * 1) } + +/* +Pick any table at random, but tables have probabilities. +All sharded tables have equal probabilities. +The probabilities of all colocated tables sum up to the probability of a single sharded table. + + In essence, since all colocated tables are in a single tablet, they are all considered + to represent one single table from the perspective of picking. + + If there are 4 colocated tables and 4 sharded tables, + the probabilities for each of the sharded tables will be 1/5 (4 sharded + 1 for colocated) = 0.2. + The probabilities of each of the colocated tables will be 0.2/4 = 0.05. + Therefore, (0.05 x 4) + (0.2 x 4) = 1 + +At any given time, only X distinct tables can be IN-PROGRESS. If X=4, after picking four distinct tables, + + we will not pick a new 5th table. Only when one of the four is completely imported, + we can go on to pick a different table. This is just to make it slightly easier from a + status reporting/debugging perspective. + During this time, each of the in-progress tables will be picked with equal probability. +*/ +type ColocatedAwareRandomTaskPicker struct { + doneTasks []*ImportFileTask + // tasks which the picker has picked at least once, and are essentially in progress. + // the length of this list will be <= maxTasksInProgress + inProgressTasks []*ImportFileTask + maxTasksInProgress int + + // tasks which have not yet been picked even once. + // the tableChooser will be employed to pick a table from this list. + tableWisePendingTasks *utils.StructMap[sqlname.NameTuple, []*ImportFileTask] + tableChooser *weightedrand.Chooser[sqlname.NameTuple, int] + + tableTypes *utils.StructMap[sqlname.NameTuple, string] //colocated or sharded + +} + +type YbTargetDBColocatedChecker interface { + IsDBColocated() (bool, error) + IsTableColocated(tableName sqlname.NameTuple) (bool, error) +} + +func NewColocatedAwareRandomTaskPicker(maxTasksInProgress int, tasks []*ImportFileTask, state *ImportDataState, yb YbTargetDBColocatedChecker) (*ColocatedAwareRandomTaskPicker, error) { + var doneTasks []*ImportFileTask + var inProgressTasks []*ImportFileTask + tableWisePendingTasks := utils.NewStructMap[sqlname.NameTuple, []*ImportFileTask]() + tableTypes := utils.NewStructMap[sqlname.NameTuple, string]() + + isDBColocated, err := yb.IsDBColocated() + if err != nil { + return nil, fmt.Errorf("checking if db is colocated: %w", err) + } + + addToPendingTasks := func(t *ImportFileTask) { + // put into the table wise pending tasks. + var tablePendingTasks []*ImportFileTask + var ok bool + tablePendingTasks, ok = tableWisePendingTasks.Get(t.TableNameTup) + if !ok { + tablePendingTasks = []*ImportFileTask{} + } + tablePendingTasks = append(tablePendingTasks, t) + tableWisePendingTasks.Put(t.TableNameTup, tablePendingTasks) + } + + for _, task := range tasks { + tableName := task.TableNameTup + + // set tableType if not already set + if _, ok := tableTypes.Get(tableName); !ok { + var tableType string + if !isDBColocated { + tableType = SHARDED + } else { + isColocated, err := yb.IsTableColocated(tableName) + if err != nil { + return nil, fmt.Errorf("checking if table is colocated: table: %v: %w", tableName, err) + } + tableType = lo.Ternary(isColocated, COLOCATED, SHARDED) + } + tableTypes.Put(tableName, tableType) + } + + // put task into right bucket. + taskStatus, err := state.GetFileImportState(task.FilePath, task.TableNameTup) + if err != nil { + return nil, fmt.Errorf("getting file import state for tasl: %v: %w", task, err) + } + switch taskStatus { + case FILE_IMPORT_COMPLETED: + doneTasks = append(doneTasks, task) + case FILE_IMPORT_IN_PROGRESS: + if len(inProgressTasks) < maxTasksInProgress { + inProgressTasks = append(inProgressTasks, task) + } else { + addToPendingTasks(task) + } + case FILE_IMPORT_NOT_STARTED: + addToPendingTasks(task) + default: + return nil, fmt.Errorf("unexpected status for task: %v: %v", task, taskStatus) + } + } + + picker := &ColocatedAwareRandomTaskPicker{ + doneTasks: doneTasks, + inProgressTasks: inProgressTasks, + maxTasksInProgress: maxTasksInProgress, + tableWisePendingTasks: tableWisePendingTasks, + tableTypes: tableTypes, + } + if len(picker.tableWisePendingTasks.Keys()) > 0 { + err = picker.initializeChooser() + if err != nil { + return nil, fmt.Errorf("initializing chooser: %w", err) + } + } + + log.Infof("ColocatedAwareRandomTaskPicker initialized with params:%v", spew.Sdump(picker)) + return picker, nil +} + +func (c *ColocatedAwareRandomTaskPicker) Pick() (*ImportFileTask, error) { + if !c.HasMoreTasks() { + return nil, fmt.Errorf("no more tasks") + } + + // if we have already picked maxTasksInProgress tasks, pick a task from inProgressTasks + if len(c.inProgressTasks) == c.maxTasksInProgress { + return c.PickTaskFromInProgressTasks() + } + + // if we have less than maxTasksInProgress tasks in progress, but no pending tasks, pick a task from inProgressTasks + if len(c.inProgressTasks) < c.maxTasksInProgress && len(c.tableWisePendingTasks.Keys()) == 0 { + return c.PickTaskFromInProgressTasks() + } + + // pick a new task from pending tasks + return c.PickTaskFromPendingTasks() +} + +func (c *ColocatedAwareRandomTaskPicker) PickTaskFromInProgressTasks() (*ImportFileTask, error) { + if len(c.inProgressTasks) == 0 { + return nil, fmt.Errorf("no tasks in progress") + } + + // pick a random task from inProgressTasks + taskIndex := rand.Intn(len(c.inProgressTasks)) + return c.inProgressTasks[taskIndex], nil +} + +func (c *ColocatedAwareRandomTaskPicker) PickTaskFromPendingTasks() (*ImportFileTask, error) { + if len(c.tableWisePendingTasks.Keys()) == 0 { + return nil, fmt.Errorf("no pending tasks to pick from") + } + if c.tableChooser == nil { + return nil, fmt.Errorf("chooser not initialized") + } + + tablePick := c.tableChooser.Pick() + tablePendingTasks, ok := c.tableWisePendingTasks.Get(tablePick) + if !ok { + return nil, fmt.Errorf("no pending tasks for table picked: %s: %v", tablePick, c.tableWisePendingTasks) + } + + pickedTask := tablePendingTasks[0] + tablePendingTasks = tablePendingTasks[1:] + + if len(tablePendingTasks) == 0 { + c.tableWisePendingTasks.Delete(tablePick) + + // reinitialize chooser because we have removed a table from the pending list, so weights will change. + // we need to update the weights every time the list of pending tasks change. + // We can't simply remove a choice, because the weights need to be rebalanced based on what is pending. + if len(c.tableWisePendingTasks.Keys()) > 0 { + err := c.initializeChooser() + if err != nil { + return nil, fmt.Errorf("re-initializing chooser after picking task: %v: %w", pickedTask, err) + } + } + } else { + c.tableWisePendingTasks.Put(tablePick, tablePendingTasks) + } + c.inProgressTasks = append(c.inProgressTasks, pickedTask) + log.Infof("Picked task: %v. In-Progress tasks:%v", pickedTask, c.inProgressTasks) + return pickedTask, nil +} + +func (c *ColocatedAwareRandomTaskPicker) initializeChooser() error { + if len(c.tableWisePendingTasks.Keys()) == 0 { + return fmt.Errorf("no pending tasks to initialize chooser") + } + tableNames := make([]sqlname.NameTuple, 0, len(c.tableWisePendingTasks.Keys())) + c.tableWisePendingTasks.IterKV(func(k sqlname.NameTuple, v []*ImportFileTask) (bool, error) { + tableNames = append(tableNames, k) + return true, nil + }) + + colocatedCount := 0 + for _, tableName := range tableNames { + tableType, ok := c.tableTypes.Get(tableName) + if !ok { + return fmt.Errorf("table type not found for table: %v", tableName) + } + + if tableType == COLOCATED { + colocatedCount++ + } + } + colocatedWeight := 1 + // if all sharded tables, then equal weight of 1. + // otherwise, weight of a sharded tables = weight of all colocated tables. + shardedWeight := lo.Ternary(colocatedCount == 0, 1, colocatedCount*colocatedWeight) + + choices := []weightedrand.Choice[sqlname.NameTuple, int]{} + for _, tableName := range tableNames { + tableType, ok := c.tableTypes.Get(tableName) + if !ok { + return fmt.Errorf("table type not found for table: %v", tableName) + } + if tableType == COLOCATED { + choices = append(choices, weightedrand.NewChoice(tableName, colocatedWeight)) + } else { + choices = append(choices, weightedrand.NewChoice(tableName, shardedWeight)) + } + + } + var err error + c.tableChooser, err = weightedrand.NewChooser(choices...) + if err != nil { + return fmt.Errorf("creating chooser: %w", err) + } + return nil +} + +func (c *ColocatedAwareRandomTaskPicker) MarkTaskAsDone(task *ImportFileTask) error { + for i, t := range c.inProgressTasks { + if t.ID == task.ID { + c.inProgressTasks = append(c.inProgressTasks[:i], c.inProgressTasks[i+1:]...) + c.doneTasks = append(c.doneTasks, task) + log.Infof("Marked task as done: %v. In-Progress tasks:%v", t, c.inProgressTasks) + return nil + } + } + return fmt.Errorf("task [%v] not found in inProgressTasks: %v", task, c.inProgressTasks) +} + +func (c *ColocatedAwareRandomTaskPicker) HasMoreTasks() bool { + if len(c.inProgressTasks) > 0 { + return true + } + + pendingTasks := false + c.tableWisePendingTasks.IterKV(func(tableName sqlname.NameTuple, tasks []*ImportFileTask) (bool, error) { + if len(tasks) > 0 { + pendingTasks = true + return false, nil + } + return true, nil + }) + + return pendingTasks +} + +func (c *ColocatedAwareRandomTaskPicker) WaitForTasksBatchesTobeImported() { + // no wait + return +} diff --git a/yb-voyager/cmd/importDataFileTaskPicker_test.go b/yb-voyager/cmd/importDataFileTaskPicker_test.go index 2d9b38db88..c24a17ffc0 100644 --- a/yb-voyager/cmd/importDataFileTaskPicker_test.go +++ b/yb-voyager/cmd/importDataFileTaskPicker_test.go @@ -17,14 +17,42 @@ package cmd import ( "fmt" + "math" "os" "slices" "testing" + "github.com/mroth/weightedrand/v2" + "github.com/samber/lo" "github.com/stretchr/testify/assert" + "github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" testutils "github.com/yugabyte/yb-voyager/yb-voyager/test/utils" ) +type dummyYb struct { + tgtdb.TargetYugabyteDB + colocatedTables []sqlname.NameTuple + shardedTables []sqlname.NameTuple +} + +func (d *dummyYb) IsTableColocated(tableName sqlname.NameTuple) (bool, error) { + for _, t := range d.colocatedTables { + if t.Key() == tableName.Key() { + return true, nil + } + } + return false, nil +} + +func (d *dummyYb) IsDBColocated() (bool, error) { + return len(d.colocatedTables) > 0, nil +} + +func (d *dummyYb) ImportBatch(batch tgtdb.Batch, args *tgtdb.ImportBatchArgs, exportDir string, tableSchema map[string]map[string]string) (int64, error) { + return 1, nil +} + func TestSequentialTaskPickerBasic(t *testing.T) { ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) testutils.FatalIfError(t, err) @@ -51,7 +79,7 @@ func TestSequentialTaskPickerBasic(t *testing.T) { assert.True(t, picker.HasMoreTasks()) - // no matter how many times we call NextTask, it should return the same task (first task) + // no matter how many times we call Pick, it should return the same task (first task) for i := 0; i < 10; i++ { task, err := picker.Pick() assert.NoError(t, err) @@ -85,7 +113,7 @@ func TestSequentialTaskPickerMarkTaskDone(t *testing.T) { assert.True(t, picker.HasMoreTasks()) - // no matter how many times we call NextTask, it should return the same task (first task) + // no matter how many times we call Pick, it should return the same task (first task) for i := 0; i < 10; i++ { task, err := picker.Pick() assert.NoError(t, err) @@ -190,3 +218,1008 @@ func TestSequentialTaskPickerResumePicksInProgressTask(t *testing.T) { err = picker.MarkTaskAsDone(task1) assert.Error(t, err) } + +func TestColocatedAwareRandomTaskPickerAdheresToMaxTasksInProgress(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask1, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated1", 2) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated2", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + shardedTask1, + colocatedTask1, + colocatedTask2, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + }, + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + }, + } + + picker, err := NewColocatedAwareRandomTaskPicker(2, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // initially because maxInprogressTasks is 2, we will get 2 different tasks + pickedTask1, err := picker.Pick() + assert.NoError(t, err) + pickedTask2, err := picker.Pick() + assert.NoError(t, err) + assert.NotEqual(t, pickedTask1, pickedTask2) + + // no matter how many times we call Pick therefater, + // it should return either pickedTask1 or pickedTask2 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == pickedTask1 || task == pickedTask2, "task: %v, pickedTask1: %v, pickedTask2: %v", task, pickedTask1, pickedTask2) + } + + // mark task1 as done + err = picker.MarkTaskAsDone(pickedTask1) + assert.NoError(t, err) + + // keep picking tasks until we get a task that is not pickedTask2 + var pickedTask3 *ImportFileTask + for { + task, err := picker.Pick() + if err != nil { + break + } + if task != pickedTask2 { + pickedTask3 = task + break + } + } + + // now, next task should be either pickedTask2 or pickedTask3 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == pickedTask2 || task == pickedTask3, "task: %v, pickedTask2: %v, pickedTask3: %v", task, pickedTask2, pickedTask3) + } + + // mark task3 as done + err = picker.MarkTaskAsDone(pickedTask3) + assert.NoError(t, err) + + // now, next task should be pickedTask2 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == pickedTask2, "task: %v, pickedTask2: %v", task, pickedTask2) + } + + // mark task2 as done + err = picker.MarkTaskAsDone(pickedTask2) + assert.NoError(t, err) + + // now, there should be no more tasks + assert.False(t, picker.HasMoreTasks()) + _, err = picker.Pick() + assert.Error(t, err) +} + +func TestColocatedAwareRandomTaskPickerMultipleTasksPerTableAdheresToMaxTasksInProgress(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + // multiple tasks for same table. + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, shardedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.sharded2", 2) + testutils.FatalIfError(t, err) + _, colocatedTask1, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated1", 3) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file4", ldataDir, "public.colocated2", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + shardedTask1, + shardedTask2, + colocatedTask1, + colocatedTask2, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + }, + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + shardedTask2.TableNameTup, + }, + } + + picker, err := NewColocatedAwareRandomTaskPicker(2, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // initially because maxInprogressTasks is 2, we will get 2 different tasks + pickedTask1, err := picker.Pick() + assert.NoError(t, err) + pickedTask2, err := picker.Pick() + assert.NoError(t, err) + assert.NotEqual(t, pickedTask1, pickedTask2) + + // no matter how many times we call Pick therefater, + // it should return either pickedTask1 or pickedTask2 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == pickedTask1 || task == pickedTask2, "task: %v, pickedTask1: %v, pickedTask2: %v", task, pickedTask1, pickedTask2) + } + + // mark task1 as done + err = picker.MarkTaskAsDone(pickedTask1) + assert.NoError(t, err) + + // keep picking tasks until we get a task that is not pickedTask2 + var pickedTask3 *ImportFileTask + for { + task, err := picker.Pick() + if err != nil { + break + } + if task != pickedTask2 && task != pickedTask1 { + pickedTask3 = task + break + } + } + + // now, next task should be either pickedTask2 or pickedTask3 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == pickedTask2 || task == pickedTask3, "task: %v, pickedTask2: %v, pickedTask3: %v", task, pickedTask2, pickedTask3) + } + + // mark task2, task3 as done + err = picker.MarkTaskAsDone(pickedTask2) + assert.NoError(t, err) + err = picker.MarkTaskAsDone(pickedTask3) + assert.NoError(t, err) + + // now, next task should be pickedTask4, which is not one of the previous tasks + pickedTask4, err := picker.Pick() + assert.NoError(t, err) + assert.NotContains(t, []*ImportFileTask{pickedTask1, pickedTask2, pickedTask3}, pickedTask4) + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == pickedTask4, "task: %v, pickedTask4: %v", task, pickedTask2) + } + + // mark task4 as done + err = picker.MarkTaskAsDone(pickedTask4) + assert.NoError(t, err) + + // now, there should be no more tasks + assert.False(t, picker.HasMoreTasks()) + _, err = picker.Pick() + assert.Error(t, err) +} + +func TestColocatedAwareRandomTaskPickerSingleTask(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + shardedTask1, + } + dummyYb := &dummyYb{ + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + }, + } + + picker, err := NewColocatedAwareRandomTaskPicker(2, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + pickedTask1, err := picker.Pick() + assert.NoError(t, err) + + // no matter how many times we call Pick therefater, + // it should return either pickedTask1 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == pickedTask1, "task: %v, pickedTask1: %v", task, pickedTask1) + } + + // mark task1 as done + err = picker.MarkTaskAsDone(pickedTask1) + assert.NoError(t, err) + + // now, there should be no more tasks + assert.False(t, picker.HasMoreTasks()) + _, err = picker.Pick() + assert.Error(t, err) +} + +func TestColocatedAwareRandomTaskPickerTasksEqualToMaxTasksInProgress(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask1, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated1", 2) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated2", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + shardedTask1, + colocatedTask1, + colocatedTask2, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + }, + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + }, + } + + // 3 tasks, 3 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(3, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // no matter how many times we call Pick therefater, + // it should return one of the tasks + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == shardedTask1 || task == colocatedTask1 || task == colocatedTask2, "task: %v, expected tasks = %v", task, tasks) + } + + // mark task1 as done + err = picker.MarkTaskAsDone(shardedTask1) + assert.NoError(t, err) + + // now, next task should be either colocatedTask1 or colocatedTask2 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == colocatedTask1 || task == colocatedTask2, "task: %v, colocatedTask1: %v, colocatedTask2: %v", task, colocatedTask1, colocatedTask2) + } + + // mark colocatedTask1, colocatedTask2 as done + err = picker.MarkTaskAsDone(colocatedTask1) + assert.NoError(t, err) + err = picker.MarkTaskAsDone(colocatedTask2) + assert.NoError(t, err) + + // now, there should be no more tasks + assert.False(t, picker.HasMoreTasks()) + _, err = picker.Pick() + assert.Error(t, err) +} + +func TestColocatedAwareRandomTaskPickerMultipleTasksPerTableTasksEqualToMaxTasksInProgress(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask1, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated1", 2) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated1", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + shardedTask1, + colocatedTask1, + colocatedTask2, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + }, + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + }, + } + + // 3 tasks, 3 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(3, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // no matter how many times we call Pick therefater, + // it should return one of the tasks + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == shardedTask1 || task == colocatedTask1 || task == colocatedTask2, "task: %v, expected tasks = %v", task, tasks) + } + + // mark task1 as done + err = picker.MarkTaskAsDone(shardedTask1) + assert.NoError(t, err) + + // now, next task should be either colocatedTask1 or colocatedTask2 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == colocatedTask1 || task == colocatedTask2, "task: %v, colocatedTask1: %v, colocatedTask2: %v", task, colocatedTask1, colocatedTask2) + } + + // mark colocatedTask1, colocatedTask2 as done + err = picker.MarkTaskAsDone(colocatedTask1) + assert.NoError(t, err) + err = picker.MarkTaskAsDone(colocatedTask2) + assert.NoError(t, err) + + // now, there should be no more tasks + assert.False(t, picker.HasMoreTasks()) + _, err = picker.Pick() + assert.Error(t, err) +} + +func TestColocatedAwareRandomTaskPickerTasksLessThanMaxTasksInProgress(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask1, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated1", 2) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated2", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + shardedTask1, + colocatedTask1, + colocatedTask2, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + }, + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + }, + } + + // 3 tasks, 10 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // no matter how many times we call Pick therefater, + // it should return one of the tasks + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == shardedTask1 || task == colocatedTask1 || task == colocatedTask2, "task: %v, expected tasks = %v", task, tasks) + } + + // mark task1 as done + err = picker.MarkTaskAsDone(shardedTask1) + assert.NoError(t, err) + + // now, next task should be either colocatedTask1 or colocatedTask2 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == colocatedTask1 || task == colocatedTask2, "task: %v, colocatedTask1: %v, colocatedTask2: %v", task, colocatedTask1, colocatedTask2) + } + + // mark colocatedTask1, colocatedTask2 as done + err = picker.MarkTaskAsDone(colocatedTask1) + assert.NoError(t, err) + err = picker.MarkTaskAsDone(colocatedTask2) + assert.NoError(t, err) + + // now, there should be no more tasks + assert.False(t, picker.HasMoreTasks()) + _, err = picker.Pick() + assert.Error(t, err) +} + +func TestColocatedAwareRandomTaskPickerMultipleTasksPerTableTasksLessThanMaxTasksInProgress(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask1, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated1", 2) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated1", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + shardedTask1, + colocatedTask1, + colocatedTask2, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + }, + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + }, + } + + // 3 tasks, 10 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // no matter how many times we call Pick therefater, + // it should return one of the tasks + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == shardedTask1 || task == colocatedTask1 || task == colocatedTask2, "task: %v, expected tasks = %v", task, tasks) + } + + // mark task1 as done + err = picker.MarkTaskAsDone(shardedTask1) + assert.NoError(t, err) + + // now, next task should be either colocatedTask1 or colocatedTask2 + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == colocatedTask1 || task == colocatedTask2, "task: %v, colocatedTask1: %v, colocatedTask2: %v", task, colocatedTask1, colocatedTask2) + } + + // mark colocatedTask1, colocatedTask2 as done + err = picker.MarkTaskAsDone(colocatedTask1) + assert.NoError(t, err) + err = picker.MarkTaskAsDone(colocatedTask2) + assert.NoError(t, err) + + // now, there should be no more tasks + assert.False(t, picker.HasMoreTasks()) + _, err = picker.Pick() + assert.Error(t, err) +} + +func TestColocatedAwareRandomTaskPickerAllShardedTasks(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, shardedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.sharded2", 2) + testutils.FatalIfError(t, err) + _, shardedTask3, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.sharded3", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + shardedTask1, + shardedTask2, + shardedTask3, + } + dummyYb := &dummyYb{ + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + shardedTask2.TableNameTup, + shardedTask3.TableNameTup, + }, + } + + // 3 tasks, 10 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // no matter how many times we call Pick therefater, + // it should return one of the tasks + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == shardedTask1 || task == shardedTask2 || task == shardedTask3, "task: %v, expected tasks = %v", task, tasks) + } +} + +func TestColocatedAwareRandomTaskPickerAllShardedTasksChooser(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, shardedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.sharded2", 2) + testutils.FatalIfError(t, err) + _, shardedTask3, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.sharded3", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + shardedTask1, + shardedTask2, + shardedTask3, + } + dummyYb := &dummyYb{ + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + shardedTask2.TableNameTup, + shardedTask3.TableNameTup, + }, + } + + // 3 tasks, 10 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + picker.initializeChooser() + // all sharded tables should have equal probability of being picked + assertTableChooserPicksShardedAndColocatedAsExpected(t, picker.tableChooser, dummyYb.colocatedTables, dummyYb.shardedTables) + + // now pick one task. After picking that task, the chooser should have updated probabilities + task, err := picker.Pick() + assert.NoError(t, err) + + updatedPendingColocatedTables := lo.Filter(dummyYb.colocatedTables, func(t sqlname.NameTuple, _ int) bool { + return t != task.TableNameTup + }) + updatedPendingShardedTables := lo.Filter(dummyYb.shardedTables, func(t sqlname.NameTuple, _ int) bool { + return t != task.TableNameTup + }) + + assertTableChooserPicksShardedAndColocatedAsExpected(t, picker.tableChooser, updatedPendingColocatedTables, updatedPendingShardedTables) + +} + +func assertTableChooserPicksShardedAndColocatedAsExpected(t *testing.T, chooser *weightedrand.Chooser[sqlname.NameTuple, int], + expectedColocatedTableNames []sqlname.NameTuple, expectedShardedTableNames []sqlname.NameTuple) { + tableNameTuples := append(expectedColocatedTableNames, expectedShardedTableNames...) + colocatedPickCounter := map[sqlname.NameTuple]int{} + shardedPickCounter := map[sqlname.NameTuple]int{} + + for i := 0; i < 100000; i++ { + tableName := chooser.Pick() + assert.Contains(t, tableNameTuples, tableName) + if slices.Contains(expectedColocatedTableNames, tableName) { + colocatedPickCounter[tableName]++ + } else { + shardedPickCounter[tableName]++ + } + } + fmt.Printf("colocatedPickCounter: %v\n", colocatedPickCounter) + fmt.Printf("shardedPickCounter: %v\n", shardedPickCounter) + totalColocatedTables := len(expectedColocatedTableNames) + totalShardedTables := len(expectedShardedTableNames) + + assert.Equal(t, totalColocatedTables, len(colocatedPickCounter)) + assert.Equal(t, totalShardedTables, len(shardedPickCounter)) + + // assert that all colocated tables have been picked almost equal number of times + totalColocatedPicks := 0 + for _, v := range colocatedPickCounter { + totalColocatedPicks += v + } + if totalColocatedPicks > 0 { + expectedCountForEachColocatedTable := totalColocatedPicks / totalColocatedTables + for _, v := range colocatedPickCounter { + diff := math.Abs(float64(v - expectedCountForEachColocatedTable)) + diffPct := float64(diff) / float64(expectedCountForEachColocatedTable) * 100 + // pct difference from expected count should be less than 5% + assert.Truef(t, diffPct < 5, "diff: %v, diffPct: %v", diff, diffPct) + } + } + + // assert that all sharded tables have been picked almost equal number of times + totalShardedPicks := 0 + for _, v := range shardedPickCounter { + totalShardedPicks += v + } + if totalShardedPicks > 0 { + expectedCountForEachShardedTable := totalShardedPicks / totalShardedTables + for _, v := range shardedPickCounter { + diff := math.Abs(float64(v - expectedCountForEachShardedTable)) + diffPct := float64(diff) / float64(expectedCountForEachShardedTable) * 100 + // pct difference from expected count should be less than 5% + assert.Truef(t, diffPct < 5, "diff: %v, diffPct: %v", diff, diffPct) + } + } + + // assert that sum of probability of picking any of the colocated tables should be same as probability of picking any of the sharded tables + if totalColocatedPicks > 0 && totalShardedPicks > 0 { + for _, v := range shardedPickCounter { + diff := math.Abs(float64(v - totalColocatedPicks)) + diffPct := float64(diff) / float64(totalColocatedPicks) * 100 + // pct difference from expected count should be less than 5% + assert.Truef(t, diffPct < 5, "diff: %v, diffPct: %v", diff, diffPct) + } + } +} + +func TestColocatedAwareRandomTaskPickerAllColocatedTasks(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, colocatedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.colocated1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated2", 2) + testutils.FatalIfError(t, err) + _, colocatedTask3, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated3", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + colocatedTask1, + colocatedTask2, + colocatedTask3, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + colocatedTask3.TableNameTup, + }, + } + + // 3 tasks, 10 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // no matter how many times we call Pick therefater, + // it should return one of the tasks + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == colocatedTask1 || task == colocatedTask2 || task == colocatedTask3, "task: %v, expected tasks = %v", task, tasks) + } +} + +func TestColocatedAwareRandomTaskPickerAllColocatedTasksChooser(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, colocatedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.colocated1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated2", 2) + testutils.FatalIfError(t, err) + _, colocatedTask3, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated3", 3) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + colocatedTask1, + colocatedTask2, + colocatedTask3, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + colocatedTask3.TableNameTup, + }, + } + + // 3 tasks, 10 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // all colocated tables should have equal probability of being picked + picker.initializeChooser() + assertTableChooserPicksShardedAndColocatedAsExpected(t, picker.tableChooser, dummyYb.colocatedTables, dummyYb.shardedTables) + + // now pick one task. After picking that task, the chooser should have updated probabilities + task, err := picker.Pick() + assert.NoError(t, err) + + updatedPendingColocatedTables := lo.Filter(dummyYb.colocatedTables, func(t sqlname.NameTuple, _ int) bool { + return t != task.TableNameTup + }) + updatedPendingShardedTables := lo.Filter(dummyYb.shardedTables, func(t sqlname.NameTuple, _ int) bool { + return t != task.TableNameTup + }) + + assertTableChooserPicksShardedAndColocatedAsExpected(t, picker.tableChooser, updatedPendingColocatedTables, updatedPendingShardedTables) +} + +func TestColocatedAwareRandomTaskPickerMixShardedColocatedTasks(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, colocatedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.colocated1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated2", 2) + testutils.FatalIfError(t, err) + _, colocatedTask3, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated3", 3) + testutils.FatalIfError(t, err) + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, shardedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.sharded2", 2) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + colocatedTask1, + colocatedTask2, + colocatedTask3, + shardedTask1, + shardedTask2, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + colocatedTask3.TableNameTup, + }, + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + shardedTask2.TableNameTup, + }, + } + + // 5 tasks, 10 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // no matter how many times we call Pick therefater, + // it should return one of the tasks + for i := 0; i < 100; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + assert.Truef(t, task == colocatedTask1 || task == colocatedTask2 || task == colocatedTask3 || task == shardedTask1 || task == shardedTask2, "task: %v, expected tasks = %v", task, tasks) + } +} + +func TestColocatedAwareRandomTaskPickerMixShardedColocatedTasksChooser(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, colocatedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.colocated1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated2", 2) + testutils.FatalIfError(t, err) + _, colocatedTask3, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated3", 3) + testutils.FatalIfError(t, err) + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, shardedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.sharded2", 2) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + colocatedTask1, + colocatedTask2, + colocatedTask3, + shardedTask1, + shardedTask2, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + colocatedTask3.TableNameTup, + }, + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + shardedTask2.TableNameTup, + }, + } + + // 5 tasks, 10 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // all colocated tables should have same probability of being picked + // all sharded tables should have same probability of being picked + // sum of probability of picking any of the colocated tables should be same as probability of picking any of the sharded tables + picker.initializeChooser() + assertTableChooserPicksShardedAndColocatedAsExpected(t, picker.tableChooser, dummyYb.colocatedTables, dummyYb.shardedTables) + + updatedPendingColocatedTables := dummyYb.colocatedTables + updatedPendingShardedTables := dummyYb.shardedTables + + // now pick tasks one by one. After picking each task, the chooser should have updated probabilities + for i := 0; i < 4; i++ { + task, err := picker.Pick() + assert.NoError(t, err) + + updatedPendingColocatedTables = lo.Filter(updatedPendingColocatedTables, func(t sqlname.NameTuple, _ int) bool { + return t != task.TableNameTup + }) + updatedPendingShardedTables = lo.Filter(updatedPendingShardedTables, func(t sqlname.NameTuple, _ int) bool { + return t != task.TableNameTup + }) + + assertTableChooserPicksShardedAndColocatedAsExpected(t, picker.tableChooser, updatedPendingColocatedTables, updatedPendingShardedTables) + } +} + +func TestColocatedAwareRandomTaskPickerResumable(t *testing.T) { + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) + testutils.FatalIfError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + _, colocatedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.colocated1", 1) + testutils.FatalIfError(t, err) + _, colocatedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.colocated2", 2) + testutils.FatalIfError(t, err) + _, colocatedTask3, err := createFileAndTask(lexportDir, "file3", ldataDir, "public.colocated3", 3) + testutils.FatalIfError(t, err) + _, shardedTask1, err := createFileAndTask(lexportDir, "file1", ldataDir, "public.sharded1", 1) + testutils.FatalIfError(t, err) + _, shardedTask2, err := createFileAndTask(lexportDir, "file2", ldataDir, "public.sharded2", 2) + testutils.FatalIfError(t, err) + + tasks := []*ImportFileTask{ + colocatedTask1, + colocatedTask2, + colocatedTask3, + shardedTask1, + shardedTask2, + } + dummyYb := &dummyYb{ + colocatedTables: []sqlname.NameTuple{ + colocatedTask1.TableNameTup, + colocatedTask2.TableNameTup, + colocatedTask3.TableNameTup, + }, + shardedTables: []sqlname.NameTuple{ + shardedTask1.TableNameTup, + shardedTask2.TableNameTup, + }, + } + + // 5 tasks, 10 max tasks in progress + picker, err := NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + + // pick 3 tasks and start the process. + task1, err := picker.Pick() + assert.NoError(t, err) + fbp1, err := NewFileBatchProducer(task1, state) + batch1, err := fbp1.NextBatch() + assert.NoError(t, err) + batch1.MarkInProgress() + + task2, err := picker.Pick() + assert.NoError(t, err) + fbp2, err := NewFileBatchProducer(task2, state) + batch2, err := fbp2.NextBatch() + assert.NoError(t, err) + batch2.MarkInProgress() + + task3, err := picker.Pick() + assert.NoError(t, err) + fbp3, err := NewFileBatchProducer(task3, state) + batch3, err := fbp3.NextBatch() + assert.NoError(t, err) + batch3.MarkInProgress() + + // simulate restart. now, those 3 tasks should be in progress + picker, err = NewColocatedAwareRandomTaskPicker(10, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + assert.Equal(t, 3, len(picker.inProgressTasks)) + + // simulate restart with a larger no. of max tasks in progress + picker, err = NewColocatedAwareRandomTaskPicker(20, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + assert.Equal(t, 3, len(picker.inProgressTasks)) + + // simulate restart with a smaller no. of max tasks in progress + picker, err = NewColocatedAwareRandomTaskPicker(2, tasks, state, dummyYb) + testutils.FatalIfError(t, err) + assert.True(t, picker.HasMoreTasks()) + // only two shoudl be inprogress even though previously 3 were in progress. + assert.Equal(t, 2, len(picker.inProgressTasks)) +} diff --git a/yb-voyager/go.mod b/yb-voyager/go.mod index 716310a4a1..6dd1066590 100644 --- a/yb-voyager/go.mod +++ b/yb-voyager/go.mod @@ -74,6 +74,7 @@ require ( github.com/moby/sys/user v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect + github.com/mroth/weightedrand/v2 v2.1.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/yb-voyager/go.sum b/yb-voyager/go.sum index 62c999553d..8e3b8405a7 100644 --- a/yb-voyager/go.sum +++ b/yb-voyager/go.sum @@ -1605,6 +1605,8 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/mroth/weightedrand/v2 v2.1.0 h1:o1ascnB1CIVzsqlfArQQjeMy1U0NcIbBO5rfd5E/OeU= +github.com/mroth/weightedrand/v2 v2.1.0/go.mod h1:f2faGsfOGOwc1p94wzHKKZyTpcJUW7OJ/9U4yfiNAOU= github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= diff --git a/yb-voyager/src/srcdb/data/gather-assessment-metadata.tar.gz b/yb-voyager/src/srcdb/data/gather-assessment-metadata.tar.gz index c4f54749be..bc8d7b6a16 100644 Binary files a/yb-voyager/src/srcdb/data/gather-assessment-metadata.tar.gz and b/yb-voyager/src/srcdb/data/gather-assessment-metadata.tar.gz differ diff --git a/yb-voyager/src/tgtdb/yugabytedb.go b/yb-voyager/src/tgtdb/yugabytedb.go index 7c4e179f85..181386c9e7 100644 --- a/yb-voyager/src/tgtdb/yugabytedb.go +++ b/yb-voyager/src/tgtdb/yugabytedb.go @@ -1228,6 +1228,20 @@ func (yb *TargetYugabyteDB) isQueryResultNonEmpty(query string) bool { return rows.Next() } +func (yb *TargetYugabyteDB) IsDBColocated() (bool, error) { + query := "select yb_is_database_colocated()" + var isColocated bool + err := yb.QueryRow(query).Scan(&isColocated) + return isColocated, err +} + +func (yb *TargetYugabyteDB) IsTableColocated(tableName sqlname.NameTuple) (bool, error) { + query := fmt.Sprintf("select is_colocated from yb_table_properties('%s'::regclass)", tableName.ForUserQuery()) + var isTableColocated bool + err := yb.QueryRow(query).Scan(&isTableColocated) + return isTableColocated, err +} + func (yb *TargetYugabyteDB) IsAdaptiveParallelismSupported() bool { query := "SELECT * FROM pg_proc WHERE proname='yb_servers_metrics'" return yb.isQueryResultNonEmpty(query)