Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colocated aware random picker #2301

Merged
merged 97 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 84 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
fa29368
wip
makalaaneesh Jan 15, 2025
ec54ea6
base logic for producing next batch
makalaaneesh Jan 15, 2025
a4aabc8
store line from previous batch
makalaaneesh Jan 15, 2025
440a5f3
test
makalaaneesh Jan 16, 2025
c14e69e
rewrite test
makalaaneesh Jan 16, 2025
a056385
minor fix
makalaaneesh Jan 16, 2025
c45b6a3
more tests
makalaaneesh Jan 16, 2025
a78387e
batch value verify
makalaaneesh Jan 16, 2025
65c7645
assert less than
makalaaneesh Jan 16, 2025
a9a294c
resumable test
makalaaneesh Jan 16, 2025
1f32f7a
import data change to use filebatchproducer
makalaaneesh Jan 16, 2025
cbc1ede
unit tag
makalaaneesh Jan 16, 2025
943bd22
cleanup
makalaaneesh Jan 20, 2025
c8390e3
wip
makalaaneesh Jan 21, 2025
3571226
filetaskimporter
makalaaneesh Jan 22, 2025
1e4b1c8
run tests
makalaaneesh Jan 22, 2025
0c2ec24
use task importer
makalaaneesh Jan 22, 2025
5dbacbd
run tests 2
makalaaneesh Jan 22, 2025
ae0f5f3
comments
makalaaneesh Jan 22, 2025
634f29c
review comments
makalaaneesh Jan 22, 2025
b2da3ff
test for when all batches are produced and we resume
makalaaneesh Jan 22, 2025
c65f112
review comments
makalaaneesh Jan 22, 2025
3fc9164
Merge branch 'main' into aneesh/import-multiple-tables-at-same-time-1
makalaaneesh Jan 22, 2025
c9b94e1
Merge branch 'aneesh/import-multiple-tables-at-same-time-1' into anee…
makalaaneesh Jan 22, 2025
ed56f33
merge main
makalaaneesh Jan 23, 2025
f9babc5
Revert "run tests 2"
makalaaneesh Jan 23, 2025
2f1e070
Revert "run tests"
makalaaneesh Jan 23, 2025
2e8d241
basic test
makalaaneesh Jan 23, 2025
92a8254
minor refactor
makalaaneesh Jan 23, 2025
66d2c15
small refactor of test utils
makalaaneesh Jan 23, 2025
1ec7e7c
Merge branch 'main' into aneesh/import-multiple-tables-at-same-time-2
makalaaneesh Jan 23, 2025
a5b0527
resumability test
makalaaneesh Jan 23, 2025
ca46d32
fix cleanup
makalaaneesh Jan 23, 2025
530e31c
integration tag
makalaaneesh Jan 23, 2025
8871fac
different table names to avoid read restart errors
makalaaneesh Jan 23, 2025
309995b
readrestart retry
makalaaneesh Jan 23, 2025
ff9eb76
sleep between retry
makalaaneesh Jan 23, 2025
6d98ebe
cleanup
makalaaneesh Jan 24, 2025
a3ec688
close batch producer
makalaaneesh Jan 24, 2025
118418a
clean up test
makalaaneesh Jan 24, 2025
9dd04be
reduce max restarts
makalaaneesh Jan 24, 2025
b9b87cf
if check before closing
makalaaneesh Jan 24, 2025
be9ca49
task picker initial commit
makalaaneesh Jan 27, 2025
653a3a4
run tests
makalaaneesh Jan 27, 2025
d9d9699
run tests 2
makalaaneesh Jan 27, 2025
b08d7af
empty
makalaaneesh Jan 27, 2025
c83aa51
Revert "run tests 2"
makalaaneesh Jan 27, 2025
d31a434
Revert "run tests"
makalaaneesh Jan 27, 2025
a4bd311
minor refactor + bug fix
makalaaneesh Jan 27, 2025
5213f07
better implementation for MarkTaskAsDone
makalaaneesh Jan 27, 2025
520745c
review ccomments
makalaaneesh Jan 28, 2025
8f8304a
review comments 2
makalaaneesh Jan 28, 2025
fb1e861
Merge branch 'aneesh/import-multiple-tables-at-same-time-2' into anee…
makalaaneesh Jan 28, 2025
71f37f7
test cases
makalaaneesh Jan 28, 2025
72684d8
fix old behaviour
makalaaneesh Jan 28, 2025
4bbbd0d
comment
makalaaneesh Jan 28, 2025
0337345
fix error
makalaaneesh Jan 28, 2025
e514e8c
Merge branch 'aneesh/import-multiple-tables-at-same-time-2' into anee…
makalaaneesh Jan 29, 2025
02a4f1f
base
makalaaneesh Jan 29, 2025
785a0f2
init picker
makalaaneesh Jan 29, 2025
0e02acf
HasMoreTasks, MarkTaskAsDone
makalaaneesh Jan 29, 2025
56df88d
NextTask WIP
makalaaneesh Jan 29, 2025
126311f
NextTask
makalaaneesh Jan 29, 2025
fceb7bc
basic test and some fixes
makalaaneesh Jan 29, 2025
df0a35e
fix test
makalaaneesh Jan 29, 2025
64bf875
comments
makalaaneesh Jan 29, 2025
88169c1
test cases
makalaaneesh Jan 29, 2025
91ba705
comment
makalaaneesh Jan 29, 2025
4183f0f
test idea
makalaaneesh Jan 29, 2025
bdec1d7
test idea
makalaaneesh Jan 29, 2025
585d1a3
test idea
makalaaneesh Jan 29, 2025
70d345a
tests colocated, sharded
makalaaneesh Jan 30, 2025
9f3e028
bug fix
makalaaneesh Jan 30, 2025
5ebf60b
comment
makalaaneesh Jan 30, 2025
ef95274
Merge branch 'main' into aneesh/reuse-worker-pool-sequential-task-picker
makalaaneesh Feb 5, 2025
8e9be5b
empty
makalaaneesh Feb 5, 2025
a4129b3
Merge branch 'aneesh/reuse-worker-pool-sequential-task-picker' into a…
makalaaneesh Feb 5, 2025
f2023a7
tests
makalaaneesh Feb 5, 2025
920583f
remove comments
makalaaneesh Feb 5, 2025
1565ea1
updated tests to test chooser probabilities after picking tasks
makalaaneesh Feb 5, 2025
cb5c4d2
more tests
makalaaneesh Feb 6, 2025
93eb55a
use colocatedAwareRandomTaskPicker
makalaaneesh Feb 6, 2025
957ef0c
return error
makalaaneesh Feb 6, 2025
df3cded
fix for resuming
makalaaneesh Feb 6, 2025
f44cecb
Merge branch 'main' into aneesh/colocated-aware-random-picker
makalaaneesh Feb 12, 2025
0fd4447
resolve merge
makalaaneesh Feb 12, 2025
2ba9d5f
resolve merge conflicts
makalaaneesh Feb 12, 2025
0e21028
fix test
makalaaneesh Feb 12, 2025
59edf8e
logs
makalaaneesh Feb 17, 2025
fcf19b2
resumable test
makalaaneesh Feb 17, 2025
aad59d4
resumable test
makalaaneesh Feb 17, 2025
b6d968a
more comments; initialize chooser early instead of lazy
makalaaneesh Feb 17, 2025
ade6ef6
review comments
makalaaneesh Feb 17, 2025
12d1a41
Merge branch 'main' into aneesh/colocated-aware-random-picker
makalaaneesh Feb 18, 2025
3022730
review comments
makalaaneesh Feb 18, 2025
7a09185
comment
makalaaneesh Feb 18, 2025
32ba5b9
initialize chooser only if pending tasks are there
makalaaneesh Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion migtests/tests/import-file/validate
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ TABLE_NAME_DATA_MAP = {
def file_import_done_checks(tgt):
fetched_row_cnts = tgt.row_count_of_all_tables()
print(f"Row counts after import data file: {fetched_row_cnts}")
assert fetched_row_cnts == EXPECTED
for table_name, rc in EXPECTED.items():
assert fetched_row_cnts[table_name] == rc, f"Row count mismatch for table {table_name}, expected: {rc}, got: {fetched_row_cnts[table_name]}"


fetched_row_cnts_non_public = tgt.row_count_of_all_tables("non_public")
print(f"Row counts after import data file in non_public schema: {fetched_row_cnts_non_public}")
Expand Down
114 changes: 99 additions & 15 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func importDataCommandFn(cmd *cobra.Command, args []string) {
// TODO: handle case-sensitive in table names with oracle ff-db
// quoteTableNameIfRequired()
importFileTasks := discoverFilesToImport()
log.Debugf("Discovered import file tasks: %v", importFileTasks)
if importerRole == TARGET_DB_IMPORTER_ROLE {

importType = record.ExportType
Expand Down Expand Up @@ -316,6 +317,10 @@ type ImportFileTask struct {
FileSize int64
}

func (task *ImportFileTask) String() string {
return fmt.Sprintf("{ID: %d, FilePath: %s, TableName: %s, RowCount: %d, FileSize: %d}", task.ID, task.FilePath, task.TableNameTup.ForOutput(), task.RowCount, task.FileSize)
}

// func quoteTableNameIfRequired() {
// if tconf.TargetDBType != ORACLE {
// return
Expand Down Expand Up @@ -543,6 +548,8 @@ func importData(importFileTasks []*ImportFileTask) {
utils.ErrExit("Failed to classify tasks: %s", err)
}
}
log.Debugf("pending tasks: %v", pendingTasks)
log.Debugf("completed tasks: %v", completedTasks)

//TODO: BUG: we are applying table-list filter on importFileTasks, but here we are considering all tables as per
// export-data table-list. Should be fine because we are only disabling and re-enabling, but this is still not ideal.
Expand All @@ -569,6 +576,7 @@ func importData(importFileTasks []*ImportFileTask) {
utils.PrintAndLog("Tables to import: %v", importFileTasksToTableNames(pendingTasks))
prepareTableToColumns(pendingTasks) //prepare the tableToColumns map
poolSize := tconf.Parallelism * 2
maxTasksInProgress := tconf.Parallelism
if tconf.EnableYBAdaptiveParallelism {
// in case of adaptive parallelism, we need to use maxParalllelism * 2
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
Expand All @@ -584,28 +592,36 @@ func importData(importFileTasks []*ImportFileTask) {
controlPlane.UpdateImportedRowCount(importDataAllTableMetrics)
}

for _, task := range pendingTasks {
// The code can produce `poolSize` number of batches at a time. But, it can consume only
// `parallelism` number of batches at a time.
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
log.Infof("created batch import pool of size: %d", poolSize)

taskImporter, err := NewFileTaskImporter(task, state, batchImportPool, progressReporter)
useTaskPicker := utils.GetEnvAsBool("USE_TASK_PICKER_FOR_IMPORT", true)
if useTaskPicker {
err := importTasksViaTaskPicker(pendingTasks, state, progressReporter, poolSize, maxTasksInProgress)
if err != nil {
utils.ErrExit("Failed to create file task importer: %s", err)
utils.ErrExit("Failed to import tasks via task picker: %s", err)
}
} else {
for _, task := range pendingTasks {
// The code can produce `poolSize` number of batches at a time. But, it can consume only
// `parallelism` number of batches at a time.
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
log.Infof("created batch import pool of size: %d", poolSize)

for !taskImporter.AllBatchesSubmitted() {
err := taskImporter.SubmitNextBatch()
taskImporter, err := NewFileTaskImporter(task, state, batchImportPool, progressReporter)
if err != nil {
utils.ErrExit("Failed to submit next batch: task:%v err: %s", task, err)
utils.ErrExit("Failed to create file task importer: %s", err)
}
}

batchImportPool.Wait() // Wait for the file import to finish.
taskImporter.PostProcess()
for !taskImporter.AllBatchesSubmitted() {
err := taskImporter.SubmitNextBatch()
if err != nil {
utils.ErrExit("Failed to submit next batch: task:%v err: %s", task, err)
}
}

batchImportPool.Wait() // wait for file import to finish
taskImporter.PostProcess()
}
time.Sleep(time.Second * 2)
}
time.Sleep(time.Second * 2)
}
utils.PrintAndLog("snapshot data import complete\n\n")
}
Expand Down Expand Up @@ -684,6 +700,74 @@ func importData(importFileTasks []*ImportFileTask) {

}

func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataState, progressReporter *ImportDataProgressReporter, poolSize int, maxTasksInProgress int) error {
// The code can produce `poolSize` number of batches at a time. But, it can consume only
// `parallelism` number of batches at a time.
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
log.Infof("created batch import pool of size: %d", poolSize)

var taskPicker FileTaskPicker
var err error
if importerRole == TARGET_DB_IMPORTER_ROLE || importerRole == IMPORT_FILE_ROLE {
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
if !ok {
return fmt.Errorf("expected tdb to be of type TargetYugabyteDB, got: %T", tdb)
}
taskPicker, err = NewColocatedAwareRandomTaskPicker(maxTasksInProgress, pendingTasks, state, yb)
if err != nil {
return fmt.Errorf("create colocated aware randmo task picker: %w", err)
}
} else {
taskPicker, err = NewSequentialTaskPicker(pendingTasks, state)
if err != nil {
return fmt.Errorf("create sequential task picker: %w", err)
}
}

taskImporters := map[int]*FileTaskImporter{}

for taskPicker.HasMoreTasks() {
task, err := taskPicker.NextTask()
if err != nil {
return fmt.Errorf("get next task: %w", err)
}
var taskImporter *FileTaskImporter
var ok bool
taskImporter, ok = taskImporters[task.ID]
if !ok {
taskImporter, err = NewFileTaskImporter(task, state, batchImportPool, progressReporter)
if err != nil {
return fmt.Errorf("create file task importer: %s", err)
}
log.Infof("created file task importer for table: %s, task: %v", task.TableNameTup.ForOutput(), task)
taskImporters[task.ID] = taskImporter
}

if taskImporter.AllBatchesSubmitted() {
// All batches for this task have been submitted.
// task could have been completed (all batches imported) OR still in progress
// in case task is done, we should inform task picker so that we stop picking that task.
taskDone, err := taskImporter.AllBatchesImported()
if err != nil {
return fmt.Errorf("check if all batches are imported: task: %v err :%w", task, err)
}
if taskDone {
taskImporter.PostProcess()
err = taskPicker.MarkTaskAsDone(task)
if err != nil {
return fmt.Errorf("mark task as done: task: %v, err: %w", task, err)
}
}
continue
}
err = taskImporter.SubmitNextBatch()
if err != nil {
return fmt.Errorf("submit next batch: task:%v err: %s", task, err)
}
}
return nil
}

func startAdaptiveParallelism() (bool, error) {
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
if !ok {
Expand Down
12 changes: 6 additions & 6 deletions yb-voyager/cmd/importDataFileBatchProducer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestBasicFileBatchProducer(t *testing.T) {

fileContents := `id,val
1, "hello"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
assert.NoError(t, err)

batchproducer, err := NewFileBatchProducer(task, state)
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) {
2, "world"
3, "foo"
4, "bar"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
assert.NoError(t, err)

batchproducer, err := NewFileBatchProducer(task, state)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) {
2, "ghijk"
3, "mnopq"
4, "stuvw"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
assert.NoError(t, err)

batchproducer, err := NewFileBatchProducer(task, state)
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *tes
2, "ghijk"
3, "mnopq1234567899876543"
4, "stuvw"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
assert.NoError(t, err)

batchproducer, err := NewFileBatchProducer(task, state)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestFileBatchProducerResumable(t *testing.T) {
2, "world"
3, "foo"
4, "bar"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
assert.NoError(t, err)

batchproducer, err := NewFileBatchProducer(task, state)
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestFileBatchProducerResumeAfterAllBatchesProduced(t *testing.T) {
2, "world"
3, "foo"
4, "bar"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
assert.NoError(t, err)

batchproducer, err := NewFileBatchProducer(task, state)
Expand Down
6 changes: 4 additions & 2 deletions yb-voyager/cmd/importDataFileCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func prepareImportFileTasks() []*ImportFileTask {
return result
}
kvs := strings.Split(fileTableMapping, ",")
for i, kv := range kvs {
idIndex := 0
for _, kv := range kvs {
globPattern, table := strings.Split(kv, ":")[0], strings.Split(kv, ":")[1]
filePaths, err := dataStore.Glob(globPattern)
if err != nil {
Expand All @@ -187,12 +188,13 @@ func prepareImportFileTasks() []*ImportFileTask {
utils.ErrExit("calculating file size in bytes: %q: %v", filePath, err)
}
task := &ImportFileTask{
ID: i,
ID: idIndex,
FilePath: filePath,
TableNameTup: tableNameTuple,
FileSize: fileSize,
}
result = append(result, task)
idIndex++
}
}
return result
Expand Down
10 changes: 7 additions & 3 deletions yb-voyager/cmd/importDataFileTaskImporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ func (fti *FileTaskImporter) AllBatchesSubmitted() bool {
return fti.batchProducer.Done()
}

func (fti *FileTaskImporter) AllBatchesImported() error {
// TODO: check importDataState for status.
panic("not implemented")
func (fti *FileTaskImporter) AllBatchesImported() (bool, error) {
taskStatus, err := fti.state.GetFileImportState(fti.task.FilePath, fti.task.TableNameTup)
if err != nil {
return false, fmt.Errorf("getting file import state: %s", err)
}
return taskStatus == FILE_IMPORT_COMPLETED, nil
}

func (fti *FileTaskImporter) SubmitNextBatch() error {
Expand Down Expand Up @@ -234,6 +237,7 @@ func getImportBatchArgsProto(tableNameTup sqlname.NameTuple, filePath string) *t
}
// If `columns` is unset at this point, no attribute list is passed in the COPY command.
fileFormat := dataFileDescriptor.FileFormat

// from export data with ora2pg, it comes as an SQL file, with COPY command having data.
// Import-data also reads it appropriately with the help of sqlDataFile.
// But while running COPY for a batch, we need to set the format as TEXT (SQL does not make sense)
Expand Down
8 changes: 4 additions & 4 deletions yb-voyager/cmd/importDataFileTaskImporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestBasicTaskImport(t *testing.T) {
fileContents := `id,val
1, "hello"
2, "world"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_basic")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_basic", 1)
testutils.FatalIfError(t, err)

progressReporter := NewImportDataProgressReporter(true)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestImportAllBatchesAndResume(t *testing.T) {
fileContents := `id,val
1, "hello"
2, "world"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_all")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_all", 1)
testutils.FatalIfError(t, err)

progressReporter := NewImportDataProgressReporter(true)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestTaskImportResumable(t *testing.T) {
2, "world"
3, "foo"
4, "bar"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_resume")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_resume", 1)
testutils.FatalIfError(t, err)

progressReporter := NewImportDataProgressReporter(true)
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestTaskImportResumableNoPK(t *testing.T) {
2, "world"
3, "foo"
4, "bar"`
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_resume_no_pk")
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_resume_no_pk", 1)
testutils.FatalIfError(t, err)

progressReporter := NewImportDataProgressReporter(true)
Expand Down
Loading