diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 99dd6432c0..36b07aebbb 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -714,6 +714,7 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS // `parallelism` number of batches at a time. batchImportPool = pool.New().WithMaxGoroutines(poolSize) log.Infof("created batch import pool of size: %d", poolSize) + taskImporters := map[int]*FileTaskImporter{} var taskPicker FileTaskPicker var err error @@ -733,8 +734,6 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS } } - taskImporters := map[int]*FileTaskImporter{} - for taskPicker.HasMoreTasks() { task, err := taskPicker.Pick() if err != nil { @@ -758,7 +757,7 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS // task could have been completed (all batches imported) OR still in progress // in case task is done, we should inform task picker so that we stop picking that task. log.Infof("All batches submitted for task: %s", task) - taskDone, err := taskImporter.AllBatchesImported() + taskDone, err := state.AllBatchesImported(task.FilePath, task.TableNameTup) if err != nil { return fmt.Errorf("check if all batches are imported: task: %v err :%w", task, err) } @@ -768,12 +767,16 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS if err != nil { return fmt.Errorf("mark task as done: task: %v, err: %w", task, err) } + state.UnregisterFileTaskImporter(taskImporter) log.Infof("Import of task done: %s", task) continue } else { // some batches are still in progress, wait for them to complete as decided by the picker. // don't want to busy-wait, so in case of sequentialTaskPicker, we sleep. - taskPicker.WaitForTasksBatchesTobeImported() + err := taskPicker.WaitForTasksBatchesTobeImported() + if err != nil { + return fmt.Errorf("wait for tasks batches to be imported: %w", err) + } continue } diff --git a/yb-voyager/cmd/importDataFileTaskImporter.go b/yb-voyager/cmd/importDataFileTaskImporter.go index 01625ca451..ad66550934 100644 --- a/yb-voyager/cmd/importDataFileTaskImporter.go +++ b/yb-voyager/cmd/importDataFileTaskImporter.go @@ -37,7 +37,6 @@ worker pool for processing. It also maintains and updates the progress of the ta */ type FileTaskImporter struct { task *ImportFileTask - state *ImportDataState batchProducer *FileBatchProducer importBatchArgsProto *tgtdb.ImportBatchArgs workerPool *pool.Pool @@ -60,7 +59,6 @@ func NewFileTaskImporter(task *ImportFileTask, state *ImportDataState, workerPoo fti := &FileTaskImporter{ task: task, - state: state, batchProducer: batchProducer, workerPool: workerPool, importBatchArgsProto: getImportBatchArgsProto(task.TableNameTup, task.FilePath), @@ -68,9 +66,14 @@ func NewFileTaskImporter(task *ImportFileTask, state *ImportDataState, workerPoo totalProgressAmount: totalProgressAmount, currentProgressAmount: currentProgressAmount, } + state.RegisterFileTaskImporter(fti) return fti, nil } +func (fti *FileTaskImporter) GetTaskID() int { + return fti.task.ID +} + // as of now, batch production and batch submission // is done together in `SubmitNextBatch` method. // In other words, as soon as a batch is produced, it is submitted. @@ -80,14 +83,6 @@ func (fti *FileTaskImporter) AllBatchesSubmitted() bool { return fti.batchProducer.Done() } -func (fti *FileTaskImporter) AllBatchesImported() (bool, error) { - taskStatus, err := fti.state.GetFileImportState(fti.task.FilePath, fti.task.TableNameTup) - if err != nil { - return false, fmt.Errorf("getting file import state: %s", err) - } - return taskStatus == FILE_IMPORT_COMPLETED, nil -} - func (fti *FileTaskImporter) ProduceAndSubmitNextBatchToWorkerPool() error { if fti.AllBatchesSubmitted() { return fmt.Errorf("no more batches to submit") diff --git a/yb-voyager/cmd/importDataFileTaskPicker.go b/yb-voyager/cmd/importDataFileTaskPicker.go index cac3b524eb..f01b7064c1 100644 --- a/yb-voyager/cmd/importDataFileTaskPicker.go +++ b/yb-voyager/cmd/importDataFileTaskPicker.go @@ -38,7 +38,7 @@ type FileTaskPicker interface { Pick() (*ImportFileTask, error) MarkTaskAsDone(task *ImportFileTask) error HasMoreTasks() bool - WaitForTasksBatchesTobeImported() + WaitForTasksBatchesTobeImported() error } /* @@ -90,6 +90,7 @@ func (s *SequentialTaskPicker) Pick() (*ImportFileTask, error) { s.inProgressTask = s.pendingTasks[0] s.pendingTasks = s.pendingTasks[1:] } + return s.inProgressTask, nil } @@ -114,11 +115,12 @@ func (s *SequentialTaskPicker) HasMoreTasks() bool { return len(s.pendingTasks) > 0 } -func (s *SequentialTaskPicker) WaitForTasksBatchesTobeImported() { +func (s *SequentialTaskPicker) WaitForTasksBatchesTobeImported() error { // Consider the scenario where we have a single task in progress and all batches are submitted, but not yet ingested. // In this case as per SequentialTaskPicker's implementation, it will wait for the task to be marked as done. // Instead of having a busy-loop where we keep checking if the task is done, we can wait for a second and then check again. time.Sleep(time.Second * 1) + return nil } /* @@ -155,6 +157,7 @@ type ColocatedAwareRandomTaskPicker struct { tableTypes *utils.StructMap[sqlname.NameTuple, string] //colocated or sharded + state *ImportDataState } type YbTargetDBColocatedChecker interface { @@ -230,6 +233,7 @@ func NewColocatedAwareRandomTaskPicker(maxTasksInProgress int, tasks []*ImportFi maxTasksInProgress: maxTasksInProgress, tableWisePendingTasks: tableWisePendingTasks, tableTypes: tableTypes, + state: state, } if len(picker.tableWisePendingTasks.Keys()) > 0 { err = picker.initializeChooser() @@ -384,7 +388,24 @@ func (c *ColocatedAwareRandomTaskPicker) HasMoreTasks() bool { return pendingTasks } -func (c *ColocatedAwareRandomTaskPicker) WaitForTasksBatchesTobeImported() { - // no wait - return +func (c *ColocatedAwareRandomTaskPicker) WaitForTasksBatchesTobeImported() error { + // if for all in-progress tasks, all batches are submitted, then sleep for a bit + allTasksAllBatchesSubmitted := true + + for _, task := range c.inProgressTasks { + taskAllBatchesSubmitted, err := c.state.AllBatchesSubmittedForTask(task.ID) + if err != nil { + return fmt.Errorf("checking if all batches are submitted for task: %v: %w", task, err) + } + if !taskAllBatchesSubmitted { + allTasksAllBatchesSubmitted = false + break + } + } + + if allTasksAllBatchesSubmitted { + log.Infof("All batches submitted for all in-progress tasks. Sleeping") + time.Sleep(time.Millisecond * 100) + } + return nil } diff --git a/yb-voyager/cmd/importDataState.go b/yb-voyager/cmd/importDataState.go index 751bd400de..55abf53190 100644 --- a/yb-voyager/cmd/importDataState.go +++ b/yb-voyager/cmd/importDataState.go @@ -55,17 +55,24 @@ metainfo/import_data_state/table::/file:::/ batch::.... */ type ImportDataState struct { - exportDir string - stateDir string + exportDir string + stateDir string + inProgressTaskImporters map[int]fileTaskImportStatusChecker // used to fetch in-memory status from FileTaskImporter } func NewImportDataState(exportDir string) *ImportDataState { return &ImportDataState{ - exportDir: exportDir, - stateDir: filepath.Join(exportDir, "metainfo", "import_data_state", importerRole), + exportDir: exportDir, + stateDir: filepath.Join(exportDir, "metainfo", "import_data_state", importerRole), + inProgressTaskImporters: make(map[int]fileTaskImportStatusChecker), } } +type fileTaskImportStatusChecker interface { + GetTaskID() int + AllBatchesSubmitted() bool +} + func (s *ImportDataState) PrepareForFileImport(filePath string, tableNameTup sqlname.NameTuple) error { fileStateDir := s.getFileStateDir(filePath, tableNameTup) log.Infof("Creating %q.", fileStateDir) @@ -647,6 +654,30 @@ func (s *ImportDataState) GetImportedEventsStatsForTableList(tableNameTupList [] return tablesToEventCounter, nil } +func (s *ImportDataState) RegisterFileTaskImporter(importer fileTaskImportStatusChecker) { + s.inProgressTaskImporters[importer.GetTaskID()] = importer +} + +func (s *ImportDataState) UnregisterFileTaskImporter(importer fileTaskImportStatusChecker) { + delete(s.inProgressTaskImporters, importer.GetTaskID()) +} + +func (s *ImportDataState) AllBatchesSubmittedForTask(taskId int) (bool, error) { + taskImporter, ok := s.inProgressTaskImporters[taskId] + if !ok { + return false, fmt.Errorf("task importer with id %d not registered", taskId) + } + return taskImporter.AllBatchesSubmitted(), nil +} + +func (s *ImportDataState) AllBatchesImported(filepath string, tableNameTup sqlname.NameTuple) (bool, error) { + taskStatus, err := s.GetFileImportState(filepath, tableNameTup) + if err != nil { + return false, fmt.Errorf("getting file import state: %s", err) + } + return taskStatus == FILE_IMPORT_COMPLETED, nil +} + //============================================================================ type BatchWriter struct {