Skip to content

Commit

Permalink
Do not check for enabled triggers / FKs for fallback workflow of PG i…
Browse files Browse the repository at this point in the history
…f session_replication_role session variable is used (#2322)

Adding GRANT SQL for granting the SET on session_replication_role to source-db-user to utilize it in Fallback case for import data to source
Added a check in GetEnabledTriggersAndFks(), to not fetch the triggers/Fks in case this session parameter is used.
  • Loading branch information
priyanshi-yb authored Feb 20, 2025
1 parent 411843a commit fb10bbd
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,5 +228,14 @@ GRANT pg_read_all_stats to :voyager_user;
WHERE
schema_name = ANY(string_to_array(:'schema_list', ','))
\gexec

DO $$
BEGIN
IF (substring((SELECT setting FROM pg_catalog.pg_settings WHERE name = 'server_version'), '^[0-9]+')::int >= 15) THEN
RAISE NOTICE 'Granting set on PARAMETER session_replication_role TO %;', current_setting('myvars.voyager_user');
EXECUTE format('GRANT SET ON PARAMETER session_replication_role TO %I;', current_setting('myvars.voyager_user'));
END IF;
END $$;

\endif
\endif
11 changes: 9 additions & 2 deletions yb-voyager/src/tgtdb/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
"golang.org/x/exp/slices"

"github.com/yugabyte/yb-voyager/yb-voyager/src/callhome"
"github.com/yugabyte/yb-voyager/yb-voyager/src/constants"
Expand Down Expand Up @@ -115,6 +116,9 @@ func (pg *TargetPostgreSQL) Init() error {
if err != nil {
return err
}
if len(pg.tconf.SessionVars) == 0 {
pg.tconf.SessionVars = getPGSessionInitScript(pg.tconf)
}
schemas := strings.Split(pg.tconf.Schema, ",")
schemaList := strings.Join(schemas, "','") // a','b','c
checkSchemaExistsQuery := fmt.Sprintf(
Expand Down Expand Up @@ -241,12 +245,11 @@ func (pg *TargetPostgreSQL) InitConnPool() error {
pg.tconf.Parallelism = fetchDefaultParallelJobs(tconfs, PG_DEFAULT_PARALLELISM_FACTOR)
log.Infof("Using %d parallel jobs by default. Use --parallel-jobs to specify a custom value", pg.tconf.Parallelism)
}

params := &ConnectionParams{
NumConnections: pg.tconf.Parallelism,
NumMaxConnections: pg.tconf.Parallelism,
ConnUriList: targetUriList,
SessionInitScript: getYBSessionInitScript(pg.tconf),
SessionInitScript: pg.tconf.SessionVars,
// works fine as we check the support of any session variable before using it in the script.
// So upsert and disable transaction will never be used for PG
}
Expand Down Expand Up @@ -1054,6 +1057,10 @@ func (pg *TargetPostgreSQL) getSchemaList() []string {
}

func (pg *TargetPostgreSQL) GetEnabledTriggersAndFks() (enabledTriggers []string, enabledFks []string, err error) {
if slices.Contains(pg.tconf.SessionVars, SET_SESSION_REPLICATE_ROLE_TO_REPLICA) {
//Not check for any triggers / FKs in case this session parameter is used
return nil, nil, nil
}
querySchemaArray := pg.getSchemaList()
querySchemaList := strings.Join(querySchemaArray, ",")

Expand Down
1 change: 1 addition & 0 deletions yb-voyager/src/tgtdb/tconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type TargetConf struct {
Parallelism int `json:"parallelism"`
EnableYBAdaptiveParallelism utils.BoolStr `json:"enable_adaptive_parallelism"`
MaxParallelism int `json:"max_parallelism"` // in case adaptive parallelism is enabled.
SessionVars []string `json:"session_vars"`
}

func (t *TargetConf) Clone() *TargetConf {
Expand Down
19 changes: 17 additions & 2 deletions yb-voyager/src/tgtdb/yugabytedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (yb *TargetYugabyteDB) Init() error {
return err
}

if len(yb.tconf.SessionVars) == 0 {
yb.tconf.SessionVars = getYBSessionInitScript(yb.tconf)
}

checkSchemaExistsQuery := fmt.Sprintf(
"SELECT count(nspname) FROM pg_catalog.pg_namespace WHERE nspname = '%s';",
yb.tconf.Schema)
Expand Down Expand Up @@ -241,7 +245,7 @@ func (yb *TargetYugabyteDB) InitConnPool() error {
NumConnections: yb.tconf.Parallelism,
NumMaxConnections: yb.tconf.MaxParallelism,
ConnUriList: targetUriList,
SessionInitScript: getYBSessionInitScript(yb.tconf),
SessionInitScript: yb.tconf.SessionVars,
}
yb.connPool = NewConnectionPool(params)
redactedParams := &ConnectionParams{}
Expand Down Expand Up @@ -996,6 +1000,17 @@ const (
ERROR_MSG_PERMISSION_DENIED = "permission denied"
)

func getPGSessionInitScript(tconf *TargetConf) []string {
var sessionVars []string
if checkSessionVariableSupport(tconf, SET_CLIENT_ENCODING_TO_UTF8) {
sessionVars = append(sessionVars, SET_CLIENT_ENCODING_TO_UTF8)
}
if checkSessionVariableSupport(tconf, SET_SESSION_REPLICATE_ROLE_TO_REPLICA) {
sessionVars = append(sessionVars, SET_SESSION_REPLICATE_ROLE_TO_REPLICA)
}
return sessionVars
}

func getYBSessionInitScript(tconf *TargetConf) []string {
var sessionVars []string
if checkSessionVariableSupport(tconf, SET_CLIENT_ENCODING_TO_UTF8) {
Expand Down Expand Up @@ -1067,7 +1082,7 @@ func checkSessionVariableSupport(tconf *TargetConf, sqlStmt string) bool {
if !utils.AskPrompt("Are you sure you want to proceed?") {
utils.ErrExit("Aborting import.")
}
return true
return false // support is not there even if the target user doesn't have privileges to set this parameter.
}
utils.ErrExit("error while executing sqlStatement: %q: %v", sqlStmt, err)
} else {
Expand Down

0 comments on commit fb10bbd

Please sign in to comment.