Skip to content

Commit

Permalink
Merge branch 'main' into priyanshi/callhome-error
Browse files Browse the repository at this point in the history
  • Loading branch information
priyanshi-yb committed Feb 20, 2025
2 parents 634dacb + ceb675b commit 184c8b5
Show file tree
Hide file tree
Showing 24 changed files with 1,534 additions and 72 deletions.
10 changes: 10 additions & 0 deletions installer_scripts/install-voyager-airgapped.sh
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,16 @@ check_yum_package_version() {
check_yum_dependencies() {
for requirement in "${centos_yum_package_requirements[@]}"; do
IFS='|' read -r package version_type required_version <<< "$requirement"

# In case of rhel9 the mysql-devel package is not available. However, mysql-community-devel is available. So check for that instead.
# Check if OS is rhel9
version=$(source /etc/os-release; echo "$VERSION_ID")
# Extract only the major version
majorVersion=$(echo $version | cut -d '.' -f 1)
if [[ "$majorVersion" -eq 9 && "$package" == "mysql-devel" ]]; then
package="mysql-community-devel"
fi

check_yum_package_version "$package" "$version_type" "$required_version"
done

Expand Down
11 changes: 8 additions & 3 deletions installer_scripts/install-yb-voyager
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ centos_main() {
output "Installing RPM dependencies."
$YUM_INSTALL which wget git gcc make 1>&2
$YUM_INSTALL https://download.postgresql.org/pub/repos/yum/reporpms/EL-${majorVersion}-x86_64/pgdg-redhat-repo-latest.noarch.rpm 1>&2 || true
# The postgresql17 package installs the client package and the postgresql17-server installs the server package.
# We require the client package to be installed for pg_dump and pg_restore.
$YUM_INSTALL postgresql17 1>&2
$YUM_INSTALL sqlite 1>&2
create_guardrail_scripts_dir
Expand Down Expand Up @@ -909,8 +911,8 @@ create_guardrail_scripts_dir() {

centos_install_mysql_client() {
version=$1
# If version is greater than or equal to 8 then dont install from community repo
if [ "$version" -ge 8 ]; then
# If version is equal to 8 then dont install from community repo
if [ "$version" -eq 8 ]; then
$YUM_INSTALL mysql-devel 1>&2
return
fi
Expand All @@ -919,6 +921,7 @@ centos_install_mysql_client() {
sudo yum remove -y mysql84-community-release-el${version}-1 1>&2
sudo yum install -y mysql84-community-release-el${version}-1.noarch.rpm 1>&2
sudo yum install -y mysql-community-devel 1>&2
rm -f mysql84-community-release-el${version}-1.noarch.rpm
}

centos_check_base_repo_enabled() {
Expand Down Expand Up @@ -953,7 +956,9 @@ ubuntu_install_postgres() {
sudo apt install -y postgresql-common 1>&2
echo | sudo /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh 1>&2
sudo apt-get update 1>&2
sudo apt-get -y install postgresql-17 1>&2
# postgresql-client-17 package installs the cleint package and postgresql-17 installs the server package.
# We require the client package to install pg_dump and pg_restore
sudo apt-get -y install postgresql-client-17 1>&2
output "Postgres Installed."
}

Expand Down
28 changes: 23 additions & 5 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ func importData(importFileTasks []*ImportFileTask) {
utils.PrintAndLog("Tables to import: %v", importFileTasksToTableNames(pendingTasks))
prepareTableToColumns(pendingTasks) //prepare the tableToColumns map
poolSize := tconf.Parallelism * 2
maxTasksInProgress := tconf.Parallelism
if tconf.EnableYBAdaptiveParallelism {
// in case of adaptive parallelism, we need to use maxParalllelism * 2
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
Expand All @@ -593,7 +594,7 @@ func importData(importFileTasks []*ImportFileTask) {

useTaskPicker := utils.GetEnvAsBool("USE_TASK_PICKER_FOR_IMPORT", true)
if useTaskPicker {
err := importTasksViaTaskPicker(pendingTasks, state, progressReporter, poolSize)
err := importTasksViaTaskPicker(pendingTasks, state, progressReporter, poolSize, maxTasksInProgress)
if err != nil {
utils.ErrExit("Failed to import tasks via task picker: %s", err)
}
Expand Down Expand Up @@ -708,23 +709,38 @@ func importData(importFileTasks []*ImportFileTask) {
- For the task that is picked, produce the next batch and submit it to the worker pool. Worker will asynchronously import the batch.
- If task is done, mark it as done in the task picker.
*/
func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataState, progressReporter *ImportDataProgressReporter, poolSize int) error {
func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataState, progressReporter *ImportDataProgressReporter, poolSize int, maxTasksInProgress int) error {
// The code can produce `poolSize` number of batches at a time. But, it can consume only
// `parallelism` number of batches at a time.
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
log.Infof("created batch import pool of size: %d", poolSize)

taskPicker, err := NewSequentialTaskPicker(pendingTasks, state)
if err != nil {
return fmt.Errorf("create task picker: %w", err)
var taskPicker FileTaskPicker
var err error
if importerRole == TARGET_DB_IMPORTER_ROLE || importerRole == IMPORT_FILE_ROLE {
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
if !ok {
return fmt.Errorf("expected tdb to be of type TargetYugabyteDB, got: %T", tdb)
}
taskPicker, err = NewColocatedAwareRandomTaskPicker(maxTasksInProgress, pendingTasks, state, yb)
if err != nil {
return fmt.Errorf("create colocated aware randmo task picker: %w", err)
}
} else {
taskPicker, err = NewSequentialTaskPicker(pendingTasks, state)
if err != nil {
return fmt.Errorf("create sequential task picker: %w", err)
}
}

taskImporters := map[int]*FileTaskImporter{}

for taskPicker.HasMoreTasks() {
task, err := taskPicker.Pick()
if err != nil {
return fmt.Errorf("get next task: %w", err)
}
log.Infof("Picked task for import: %s", task)
var taskImporter *FileTaskImporter
var ok bool
taskImporter, ok = taskImporters[task.ID]
Expand All @@ -741,6 +757,7 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS
// All batches for this task have been submitted.
// task could have been completed (all batches imported) OR still in progress
// in case task is done, we should inform task picker so that we stop picking that task.
log.Infof("All batches submitted for task: %s", task)
taskDone, err := taskImporter.AllBatchesImported()
if err != nil {
return fmt.Errorf("check if all batches are imported: task: %v err :%w", task, err)
Expand All @@ -751,6 +768,7 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS
if err != nil {
return fmt.Errorf("mark task as done: task: %v, err: %w", task, err)
}
log.Infof("Import of task done: %s", task)
continue
} else {
// some batches are still in progress, wait for them to complete as decided by the picker.
Expand Down
Loading

0 comments on commit 184c8b5

Please sign in to comment.