From adaeff9fd543a858457c0227d289571287e630bd Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 5 Mar 2025 10:45:18 +0530 Subject: [PATCH] doc and WaitForTasksBatchesTobeImported --- yb-voyager/cmd/importDataFileTaskPicker.go | 46 +++++++++++++++++----- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/yb-voyager/cmd/importDataFileTaskPicker.go b/yb-voyager/cmd/importDataFileTaskPicker.go index 01e9347dd..9926fcee1 100644 --- a/yb-voyager/cmd/importDataFileTaskPicker.go +++ b/yb-voyager/cmd/importDataFileTaskPicker.go @@ -410,9 +410,22 @@ func (c *ColocatedAwareRandomTaskPicker) WaitForTasksBatchesTobeImported() error return nil } +/* +The goal of this picker is to pick a combination of colocated and sharded tasks, both at random. +The limits in place are maxShardedTasksInProgress, maxColocatedTasksInProgress and colocatedBatchTaskQueue. + +Colocated tasks are limited by single tablet performance limits on YB, so we have to constrain the no. of colocated +batches that can be ingested at a time. THis is achieved by having a max parallel limit on the consumer of colocatedBatchTaskQueue. +Therefore, colocated tasks are prioritized and picked but only if colocatedBatchTaskQueue has space. +If colocatedBatchTaskQueue is full, then sharded tasks are picked. +*/ type ColocatedCappedRandomTaskPicker struct { + state *ImportDataState + maxShardedTasksInProgress int maxColocatedTasksInProgress int + colocatedBatchTaskQueue chan func() + tableTypes *utils.StructMap[sqlname.NameTuple, string] //colocated or sharded doneTasks []*ImportFileTask // tasks which the picker has picked at least once, and are essentially in progress. @@ -423,9 +436,6 @@ type ColocatedCappedRandomTaskPicker struct { // tasks which have not yet been picked even once. pendingShardedTasks []*ImportFileTask pendingColocatedTasks []*ImportFileTask - - tableTypes *utils.StructMap[sqlname.NameTuple, string] //colocated or sharded - colocatedBatchTaskQueue chan func() } func NewColocatedCappedRandomTaskPicker(maxShardedTasksInProgress int, maxColocatedTasksInProgress int, tasks []*ImportFileTask, state *ImportDataState, yb YbTargetDBColocatedChecker, colocatedBatchTaskQueue chan func()) (*ColocatedCappedRandomTaskPicker, error) { @@ -509,6 +519,7 @@ func NewColocatedCappedRandomTaskPicker(maxShardedTasksInProgress int, maxColoca } picker := &ColocatedCappedRandomTaskPicker{ + state: state, maxShardedTasksInProgress: maxShardedTasksInProgress, maxColocatedTasksInProgress: maxColocatedTasksInProgress, @@ -564,11 +575,9 @@ func (c *ColocatedCappedRandomTaskPicker) Pick() (*ImportFileTask, error) { // if only one type of tasks are left, pick from that type. if !c.HasMoreShardedTasks() { - log.Info("picking colocated task") return c.pickColocatedTask() } if !c.HasMoreColocatedTasks() { - log.Info("picking sharded task") return c.pickShardedTask() } @@ -591,12 +600,10 @@ func (c *ColocatedCappedRandomTaskPicker) Pick() (*ImportFileTask, error) { // pick from the combination of in-progress tasks. // if we can push a new colocated task into the queue, pick a colocated task. - log.Infof("colocatedBatchTaskQueue: %v, cap: %v", len(c.colocatedBatchTaskQueue), cap(c.colocatedBatchTaskQueue)) + log.Debugf("colocatedBatchTaskQueue: %v, cap: %v", len(c.colocatedBatchTaskQueue), cap(c.colocatedBatchTaskQueue)) if len(c.colocatedBatchTaskQueue) < cap(c.colocatedBatchTaskQueue) { - log.Info("picking colocated task") return c.pickInProgressColocatedTask() } - log.Info("picking sharded task") return c.pickInProgressShardedTask() } @@ -624,6 +631,7 @@ func (c *ColocatedCappedRandomTaskPicker) tryPickPendingColocatedTask() (*Import taskIndex, pickedTask := c.pickRandomFromListOfTasks(c.pendingColocatedTasks) c.pendingColocatedTasks = append(c.pendingColocatedTasks[:taskIndex], c.pendingColocatedTasks[taskIndex+1:]...) c.inProgressColocatedTasks = append(c.inProgressColocatedTasks, pickedTask) + log.Debugf("picking pending colocated task: %v", pickedTask) return pickedTask, nil } } @@ -633,6 +641,7 @@ func (c *ColocatedCappedRandomTaskPicker) tryPickPendingColocatedTask() (*Import func (c *ColocatedCappedRandomTaskPicker) pickInProgressColocatedTask() (*ImportFileTask, error) { if len(c.inProgressColocatedTasks) > 0 { _, pickedTask := c.pickRandomFromListOfTasks(c.inProgressColocatedTasks) + log.Debugf("picking in-progress colocated task: %v", pickedTask) return pickedTask, nil } return nil, fmt.Errorf("no in-progress colocated tasks to pick from") @@ -662,6 +671,7 @@ func (c *ColocatedCappedRandomTaskPicker) tryPickPendingShardedTask() (*ImportFi taskIndex, pickedTask := c.pickRandomFromListOfTasks(c.pendingShardedTasks) c.pendingShardedTasks = append(c.pendingShardedTasks[:taskIndex], c.pendingShardedTasks[taskIndex+1:]...) c.inProgressShardedTasks = append(c.inProgressShardedTasks, pickedTask) + log.Debugf("picking pending sharded task: %v", pickedTask) return pickedTask, nil } } @@ -671,6 +681,7 @@ func (c *ColocatedCappedRandomTaskPicker) tryPickPendingShardedTask() (*ImportFi func (c *ColocatedCappedRandomTaskPicker) pickInProgressShardedTask() (*ImportFileTask, error) { if len(c.inProgressShardedTasks) > 0 { _, pickedTask := c.pickRandomFromListOfTasks(c.inProgressShardedTasks) + log.Debugf("picking in-progress sharded task: %v", pickedTask) return pickedTask, nil } return nil, fmt.Errorf("no in-progress sharded tasks to pick from") @@ -696,6 +707,23 @@ func (c *ColocatedCappedRandomTaskPicker) MarkTaskAsDone(task *ImportFileTask) e } func (c *ColocatedCappedRandomTaskPicker) WaitForTasksBatchesTobeImported() error { - // no wait + // if for all in-progress tasks, all batches are submitted, then sleep for a bit + allTasksAllBatchesSubmitted := true + + for _, task := range c.inProgressTasks() { + taskAllBatchesSubmitted, err := c.state.AllBatchesSubmittedForTask(task.ID) + if err != nil { + return fmt.Errorf("checking if all batches are submitted for task: %v: %w", task, err) + } + if !taskAllBatchesSubmitted { + allTasksAllBatchesSubmitted = false + break + } + } + + if allTasksAllBatchesSubmitted { + log.Infof("All batches submitted for all in-progress tasks. Sleeping") + time.Sleep(time.Millisecond * 100) + } return nil }