Skip to content

Commit

Permalink
Increased default batch size for PostgreSQL Target to 100K rows (#1301)
Browse files Browse the repository at this point in the history
- Refactoring of registering batch-size/parallel-jobs flags for import to target and import to source-replica to CLI msgs
  • Loading branch information
priyanshi-yb authored Feb 1, 2024
1 parent cadda8e commit 7363dda
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 22 deletions.
1 change: 1 addition & 0 deletions yb-voyager/cmd/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
MAX_SLEEP_SECOND = 60
DEFAULT_BATCH_SIZE_ORACLE = 10000000
DEFAULT_BATCH_SIZE_YUGABYTEDB = 20000
DEFAULT_BATCH_SIZE_POSTGRESQL = 100000
INDEX_RETRY_COUNT = 5
DDL_MAX_RETRY_COUNT = 5
SCHEMA_VERSION_MISMATCH_ERR = "Query error: schema version mismatch for table"
Expand Down
35 changes: 21 additions & 14 deletions yb-voyager/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,6 @@ func registerImportDataCommonFlags(cmd *cobra.Command) {
cmd.Flags().StringVar(&tableListFilePath, "table-list-file-path", "",
"path of the file containing the list of the source db table names to import data")

defaultBatchSizeMsg := fmt.Sprintf("(default: target(%d), source-replica/source(%d))", DEFAULT_BATCH_SIZE_YUGABYTEDB, DEFAULT_BATCH_SIZE_ORACLE)
if cmd.CommandPath() == "yb-voyager import data file" {
defaultBatchSizeMsg = "(default: 20000)"
}
cmd.Flags().Int64Var(&batchSize, "batch-size", 0,
fmt.Sprintf("Size of batches in the number of rows generated for ingestion during import %s", defaultBatchSizeMsg))
defaultParallelismMsg := "By default, voyager will try if it can determine the total number of cores N and use N/2 as parallel jobs. " +
"Otherwise, it fall back to using twice the number of nodes in the cluster."
if cmd.CommandPath() == "yb-voyager import data to source" || cmd.CommandPath() == "yb-voyager import data to source-replica" {
defaultParallelismMsg = "(default: 16(Oracle))"
}
cmd.Flags().IntVar(&tconf.Parallelism, "parallel-jobs", 0,
"number of parallel jobs to use while importing data. "+defaultParallelismMsg)

BoolVar(cmd.Flags(), &tconf.EnableUpsert, "enable-upsert", true,
"Enable UPSERT mode on target tables")
BoolVar(cmd.Flags(), &tconf.UsePublicIP, "use-public-ip", false,
Expand Down Expand Up @@ -347,10 +333,29 @@ func checkOrSetDefaultTargetSSLMode() {
}
}

func registerFlagsForTarget(cmd *cobra.Command) {
cmd.Flags().Int64Var(&batchSize, "batch-size", 0,
fmt.Sprintf("Size of batches in the number of rows generated for ingestion during import. default(%d)", DEFAULT_BATCH_SIZE_YUGABYTEDB))
cmd.Flags().IntVar(&tconf.Parallelism, "parallel-jobs", 0,
"number of parallel jobs to use while importing data. By default, voyager will try if it can determine the total "+
"number of cores N and use N/4 as parallel jobs. "+
"Otherwise, it fall back to using twice the number of nodes in the cluster.")
}

func registerFlagsForSourceReplica(cmd *cobra.Command) {
cmd.Flags().Int64Var(&batchSize, "batch-size", 0,
fmt.Sprintf("Size of batches in the number of rows generated for ingestion during import. default: ORACLE(%d), POSTGRESQL(%d)", DEFAULT_BATCH_SIZE_ORACLE, DEFAULT_BATCH_SIZE_POSTGRESQL))
cmd.Flags().IntVar(&tconf.Parallelism, "parallel-jobs", 0,
"number of parallel jobs to use while importing data. default: For PostgreSQL(voyager will try if it can determine the total " +
"number of cores N and use N/2 as parallel jobs else it will fall back to 8) and Oracle(16)")
}

func validateBatchSizeFlag(numLinesInASplit int64) {
if batchSize == 0 {
if tconf.TargetDBType == ORACLE {
batchSize = DEFAULT_BATCH_SIZE_ORACLE
} else if tconf.TargetDBType == POSTGRESQL {
batchSize = DEFAULT_BATCH_SIZE_POSTGRESQL
} else {
batchSize = DEFAULT_BATCH_SIZE_YUGABYTEDB
}
Expand All @@ -360,6 +365,8 @@ func validateBatchSizeFlag(numLinesInASplit int64) {
var defaultBatchSize int64
if tconf.TargetDBType == ORACLE {
defaultBatchSize = DEFAULT_BATCH_SIZE_ORACLE
} else if tconf.TargetDBType == POSTGRESQL {
defaultBatchSize = DEFAULT_BATCH_SIZE_POSTGRESQL
} else {
defaultBatchSize = DEFAULT_BATCH_SIZE_YUGABYTEDB
}
Expand Down
3 changes: 2 additions & 1 deletion yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,8 @@ func init() {
importCmd.AddCommand(importDataCmd)
importDataCmd.AddCommand(importDataToCmd)
importDataToCmd.AddCommand(importDataToTargetCmd)

registerFlagsForTarget(importDataCmd)
registerFlagsForTarget(importDataToTargetCmd)
registerCommonGlobalFlags(importDataCmd)
registerCommonGlobalFlags(importDataToTargetCmd)
registerCommonImportFlags(importDataCmd)
Expand Down
3 changes: 2 additions & 1 deletion yb-voyager/cmd/importDataFileCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func prepareForImportDataCmd(importFileTasks []*ImportFileTask) {
if err != nil {
utils.ErrExit("failed to update migration status record: %v", err)
}

dataFileList := getFileSizeInfo(importFileTasks)
dataFileDescriptor = &datafile.Descriptor{
FileFormat: fileFormat,
Expand Down Expand Up @@ -354,6 +354,7 @@ func init() {
registerCommonGlobalFlags(importDataFileCmd)
registerTargetDBConnFlags(importDataFileCmd)
registerImportDataCommonFlags(importDataFileCmd)
registerFlagsForTarget(importDataFileCmd)

importDataFileCmd.Flags().StringVar(&fileFormat, "format", "csv",
fmt.Sprintf("supported data file types: (%v)", strings.Join(supportedFileFormats, ",")))
Expand Down
7 changes: 4 additions & 3 deletions yb-voyager/cmd/importDataToSource.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
)

var importDataToSourceCmd = &cobra.Command{
Use: "source",
Use: "source",
Short: "Import data into the source DB to prepare for fall-back.\n" +
"For more details and examples, visit https://docs.yugabyte.com/preview/yugabyte-voyager/migrate/live-fall-back/",
Long: ``,
"For more details and examples, visit https://docs.yugabyte.com/preview/yugabyte-voyager/migrate/live-fall-back/",
Long: ``,

Run: func(cmd *cobra.Command, args []string) {
validateMetaDBCreated()
Expand All @@ -47,6 +47,7 @@ func init() {
registerCommonGlobalFlags(importDataToSourceCmd)
registerCommonImportFlags(importDataToSourceCmd)
registerSourceDBAsTargetConnFlags(importDataToSourceCmd)
registerFlagsForSourceReplica(importDataToSourceCmd)
registerImportDataCommonFlags(importDataToSourceCmd)
hideImportFlagsInFallForwardOrBackCmds(importDataToSourceCmd)
importDataToSourceCmd.Flags().MarkHidden("batch-size")
Expand Down
15 changes: 12 additions & 3 deletions yb-voyager/cmd/importDataToSourceReplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func setTargetConfSpecifics(cmd *cobra.Command) {
if cmd.Flags().Lookup("source-replica-db-schema").Changed {
utils.ErrExit("cannot specify --source-replica-db-schema for PostgreSQL source")
} else {
tconf.Schema = strings.Join(strings.Split(sconf.Schema, "|"),",")
}
tconf.Schema = strings.Join(strings.Split(sconf.Schema, "|"), ",")
}
}
}

Expand All @@ -60,11 +60,20 @@ func init() {
registerCommonGlobalFlags(importDataToSourceReplicaCmd)
registerCommonImportFlags(importDataToSourceReplicaCmd)
registerSourceReplicaDBAsTargetConnFlags(importDataToSourceReplicaCmd)
registerFlagsForSourceReplica(importDataToSourceReplicaCmd)
registerStartCleanFlag(importDataToSourceReplicaCmd)
registerImportDataCommonFlags(importDataToSourceReplicaCmd)
registerImportDataFlags(importDataToSourceReplicaCmd)
hideImportFlagsInFallForwardOrBackCmds(importDataToSourceReplicaCmd)
}

func registerStartCleanFlag(cmd *cobra.Command) {
BoolVar(cmd.Flags(), &startClean, "start-clean", false,
`Starts a fresh import with exported data files present in the export-dir/data directory.
If any table on source-replica database is non-empty, it prompts whether you want to continue the import without truncating those tables;
If you go ahead without truncating, then yb-voyager starts ingesting the data present in the data files without upsert mode.
Note that for the cases where a table doesn't have a primary key, this may lead to insertion of duplicate data. To avoid this, exclude the table using the --exclude-file-list or truncate those tables manually before using the start-clean flag (default false)`)
}

func updateFallForwardEnabledInMetaDB() {
err := metaDB.UpdateMigrationStatusRecord(func(record *metadb.MigrationStatusRecord) {
record.FallForwardEnabled = true
Expand Down

0 comments on commit 7363dda

Please sign in to comment.