Skip to content

Commit

Permalink
doc and WaitForTasksBatchesTobeImported
Browse files Browse the repository at this point in the history
  • Loading branch information
makalaaneesh committed Mar 5, 2025
1 parent fe702a1 commit adaeff9
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions yb-voyager/cmd/importDataFileTaskPicker.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,22 @@ func (c *ColocatedAwareRandomTaskPicker) WaitForTasksBatchesTobeImported() error
return nil
}

/*
The goal of this picker is to pick a combination of colocated and sharded tasks, both at random.
The limits in place are maxShardedTasksInProgress, maxColocatedTasksInProgress and colocatedBatchTaskQueue.
Colocated tasks are limited by single tablet performance limits on YB, so we have to constrain the no. of colocated
batches that can be ingested at a time. THis is achieved by having a max parallel limit on the consumer of colocatedBatchTaskQueue.
Therefore, colocated tasks are prioritized and picked but only if colocatedBatchTaskQueue has space.
If colocatedBatchTaskQueue is full, then sharded tasks are picked.
*/
type ColocatedCappedRandomTaskPicker struct {
state *ImportDataState

maxShardedTasksInProgress int
maxColocatedTasksInProgress int
colocatedBatchTaskQueue chan func()
tableTypes *utils.StructMap[sqlname.NameTuple, string] //colocated or sharded

doneTasks []*ImportFileTask
// tasks which the picker has picked at least once, and are essentially in progress.
Expand All @@ -423,9 +436,6 @@ type ColocatedCappedRandomTaskPicker struct {
// tasks which have not yet been picked even once.
pendingShardedTasks []*ImportFileTask
pendingColocatedTasks []*ImportFileTask

tableTypes *utils.StructMap[sqlname.NameTuple, string] //colocated or sharded
colocatedBatchTaskQueue chan func()
}

func NewColocatedCappedRandomTaskPicker(maxShardedTasksInProgress int, maxColocatedTasksInProgress int, tasks []*ImportFileTask, state *ImportDataState, yb YbTargetDBColocatedChecker, colocatedBatchTaskQueue chan func()) (*ColocatedCappedRandomTaskPicker, error) {
Expand Down Expand Up @@ -509,6 +519,7 @@ func NewColocatedCappedRandomTaskPicker(maxShardedTasksInProgress int, maxColoca
}

picker := &ColocatedCappedRandomTaskPicker{
state: state,
maxShardedTasksInProgress: maxShardedTasksInProgress,
maxColocatedTasksInProgress: maxColocatedTasksInProgress,

Expand Down Expand Up @@ -564,11 +575,9 @@ func (c *ColocatedCappedRandomTaskPicker) Pick() (*ImportFileTask, error) {

// if only one type of tasks are left, pick from that type.
if !c.HasMoreShardedTasks() {
log.Info("picking colocated task")
return c.pickColocatedTask()
}
if !c.HasMoreColocatedTasks() {
log.Info("picking sharded task")
return c.pickShardedTask()
}

Expand All @@ -591,12 +600,10 @@ func (c *ColocatedCappedRandomTaskPicker) Pick() (*ImportFileTask, error) {

// pick from the combination of in-progress tasks.
// if we can push a new colocated task into the queue, pick a colocated task.
log.Infof("colocatedBatchTaskQueue: %v, cap: %v", len(c.colocatedBatchTaskQueue), cap(c.colocatedBatchTaskQueue))
log.Debugf("colocatedBatchTaskQueue: %v, cap: %v", len(c.colocatedBatchTaskQueue), cap(c.colocatedBatchTaskQueue))
if len(c.colocatedBatchTaskQueue) < cap(c.colocatedBatchTaskQueue) {
log.Info("picking colocated task")
return c.pickInProgressColocatedTask()
}
log.Info("picking sharded task")
return c.pickInProgressShardedTask()
}

Expand Down Expand Up @@ -624,6 +631,7 @@ func (c *ColocatedCappedRandomTaskPicker) tryPickPendingColocatedTask() (*Import
taskIndex, pickedTask := c.pickRandomFromListOfTasks(c.pendingColocatedTasks)
c.pendingColocatedTasks = append(c.pendingColocatedTasks[:taskIndex], c.pendingColocatedTasks[taskIndex+1:]...)
c.inProgressColocatedTasks = append(c.inProgressColocatedTasks, pickedTask)
log.Debugf("picking pending colocated task: %v", pickedTask)
return pickedTask, nil
}
}
Expand All @@ -633,6 +641,7 @@ func (c *ColocatedCappedRandomTaskPicker) tryPickPendingColocatedTask() (*Import
func (c *ColocatedCappedRandomTaskPicker) pickInProgressColocatedTask() (*ImportFileTask, error) {
if len(c.inProgressColocatedTasks) > 0 {
_, pickedTask := c.pickRandomFromListOfTasks(c.inProgressColocatedTasks)
log.Debugf("picking in-progress colocated task: %v", pickedTask)
return pickedTask, nil
}
return nil, fmt.Errorf("no in-progress colocated tasks to pick from")
Expand Down Expand Up @@ -662,6 +671,7 @@ func (c *ColocatedCappedRandomTaskPicker) tryPickPendingShardedTask() (*ImportFi
taskIndex, pickedTask := c.pickRandomFromListOfTasks(c.pendingShardedTasks)
c.pendingShardedTasks = append(c.pendingShardedTasks[:taskIndex], c.pendingShardedTasks[taskIndex+1:]...)
c.inProgressShardedTasks = append(c.inProgressShardedTasks, pickedTask)
log.Debugf("picking pending sharded task: %v", pickedTask)
return pickedTask, nil
}
}
Expand All @@ -671,6 +681,7 @@ func (c *ColocatedCappedRandomTaskPicker) tryPickPendingShardedTask() (*ImportFi
func (c *ColocatedCappedRandomTaskPicker) pickInProgressShardedTask() (*ImportFileTask, error) {
if len(c.inProgressShardedTasks) > 0 {
_, pickedTask := c.pickRandomFromListOfTasks(c.inProgressShardedTasks)
log.Debugf("picking in-progress sharded task: %v", pickedTask)
return pickedTask, nil
}
return nil, fmt.Errorf("no in-progress sharded tasks to pick from")
Expand All @@ -696,6 +707,23 @@ func (c *ColocatedCappedRandomTaskPicker) MarkTaskAsDone(task *ImportFileTask) e
}

func (c *ColocatedCappedRandomTaskPicker) WaitForTasksBatchesTobeImported() error {
// no wait
// if for all in-progress tasks, all batches are submitted, then sleep for a bit
allTasksAllBatchesSubmitted := true

for _, task := range c.inProgressTasks() {
taskAllBatchesSubmitted, err := c.state.AllBatchesSubmittedForTask(task.ID)
if err != nil {
return fmt.Errorf("checking if all batches are submitted for task: %v: %w", task, err)
}
if !taskAllBatchesSubmitted {
allTasksAllBatchesSubmitted = false
break
}
}

if allTasksAllBatchesSubmitted {
log.Infof("All batches submitted for all in-progress tasks. Sleeping")
time.Sleep(time.Millisecond * 100)
}
return nil
}

0 comments on commit adaeff9

Please sign in to comment.