Skip to content

Commit

Permalink
Added retries in alter queries while restoring Auto generated Ideneti…
Browse files Browse the repository at this point in the history
…ty columns for 'while reaching out to tservers' error
  • Loading branch information
[email protected] committed Feb 5, 2024
1 parent 92f938b commit 82e6bb2
Showing 1 changed file with 27 additions and 16 deletions.
43 changes: 27 additions & 16 deletions yb-voyager/src/tgtdb/yugabytedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ const BATCH_METADATA_TABLE_NAME = BATCH_METADATA_TABLE_SCHEMA + "." + "ybvoyager
const EVENT_CHANNELS_METADATA_TABLE_NAME = BATCH_METADATA_TABLE_SCHEMA + "." + "ybvoyager_import_data_event_channels_metainfo"
const EVENTS_PER_TABLE_METADATA_TABLE_NAME = BATCH_METADATA_TABLE_SCHEMA + "." + "ybvoyager_imported_event_count_by_table"
const YB_DEFAULT_PARALLELISM_FACTOR = 2 // factor for default parallelism in case fetchDefaultParallelJobs() is not able to get the no of cores
const ALTER_QUERY_RETRY_COUNT = 5

func (yb *TargetYugabyteDB) CreateVoyagerSchema() error {
cmds := []string{
Expand Down Expand Up @@ -1055,24 +1056,34 @@ func (yb *TargetYugabyteDB) alterColumns(tableColumnsMap map[string][]string, al
query := fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN %s %s`, qualifiedTableName, column, alterAction)
batch.Queue(query)
}

err := yb.connPool.WithConn(func(conn *pgx.Conn) (bool, error) {
br := conn.SendBatch(context.Background(), &batch)
for i := 0; i < batch.Len(); i++ {
_, err := br.Exec()
if err != nil {
log.Errorf("executing query to alter columns for table(%s): %v", qualifiedTableName, err)
return false, fmt.Errorf("executing query to alter columns for table(%s): %w", qualifiedTableName, err)
var sleepIntervalSec = 0
for i := 0; i < ALTER_QUERY_RETRY_COUNT; i++ {
err := yb.connPool.WithConn(func(conn *pgx.Conn) (bool, error) {
br := conn.SendBatch(context.Background(), &batch)
for i := 0; i < batch.Len(); i++ {
_, err := br.Exec()
if err != nil {
log.Errorf("executing query to alter columns for table(%s): %v", qualifiedTableName, err)
return false, fmt.Errorf("executing query to alter columns for table(%s): %w", qualifiedTableName, err)
}
}
if err := br.Close(); err != nil {
log.Errorf("closing batch of queries to alter columns for table(%s): %v", qualifiedTableName, err)
return false, fmt.Errorf("closing batch of queries to alter columns for table(%s): %w", qualifiedTableName, err)
}
return false, nil
})
if err != nil {
log.Errorf("error in altering columns for table(%s): %v", qualifiedTableName, err)
if !strings.Contains(err.Error(), "while reaching out to the tablet servers") {
return err
}
sleepIntervalSec += 10
log.Infof("retrying after %d seconds for table(%s)", sleepIntervalSec, qualifiedTableName)
time.Sleep(time.Duration(sleepIntervalSec) * time.Second)
} else {
break
}
if err := br.Close(); err != nil {
log.Errorf("closing batch of queries to alter columns for table(%s): %v", qualifiedTableName, err)
return false, fmt.Errorf("closing batch of queries to alter columns for table(%s): %w", qualifiedTableName, err)
}
return false, nil
})
if err != nil {
return err
}
}
return nil
Expand Down

0 comments on commit 82e6bb2

Please sign in to comment.