From fb10bbdd7f33158de0a64fc8175841a26b15144c Mon Sep 17 00:00:00 2001 From: Priyanshi Gupta Date: Thu, 20 Feb 2025 16:35:32 +0530 Subject: [PATCH] Do not check for enabled triggers / FKs for fallback workflow of PG if session_replication_role session variable is used (#2322) Adding GRANT SQL for granting the SET on session_replication_role to source-db-user to utilize it in Fallback case for import data to source Added a check in GetEnabledTriggersAndFks(), to not fetch the triggers/Fks in case this session parameter is used. --- ...voyager-pg-grant-migration-permissions.sql | 9 +++++++++ yb-voyager/src/tgtdb/postgres.go | 11 +++++++++-- yb-voyager/src/tgtdb/tconf.go | 1 + yb-voyager/src/tgtdb/yugabytedb.go | 19 +++++++++++++++++-- 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/guardrails-scripts/yb-voyager-pg-grant-migration-permissions.sql b/guardrails-scripts/yb-voyager-pg-grant-migration-permissions.sql index 81efa5c60c..a66902f17c 100644 --- a/guardrails-scripts/yb-voyager-pg-grant-migration-permissions.sql +++ b/guardrails-scripts/yb-voyager-pg-grant-migration-permissions.sql @@ -228,5 +228,14 @@ GRANT pg_read_all_stats to :voyager_user; WHERE schema_name = ANY(string_to_array(:'schema_list', ',')) \gexec + + DO $$ + BEGIN + IF (substring((SELECT setting FROM pg_catalog.pg_settings WHERE name = 'server_version'), '^[0-9]+')::int >= 15) THEN + RAISE NOTICE 'Granting set on PARAMETER session_replication_role TO %;', current_setting('myvars.voyager_user'); + EXECUTE format('GRANT SET ON PARAMETER session_replication_role TO %I;', current_setting('myvars.voyager_user')); + END IF; + END $$; + \endif \endif diff --git a/yb-voyager/src/tgtdb/postgres.go b/yb-voyager/src/tgtdb/postgres.go index 53f3a78fee..95b28a11df 100644 --- a/yb-voyager/src/tgtdb/postgres.go +++ b/yb-voyager/src/tgtdb/postgres.go @@ -33,6 +33,7 @@ import ( _ "github.com/jackc/pgx/v5/stdlib" "github.com/samber/lo" log "github.com/sirupsen/logrus" + "golang.org/x/exp/slices" "github.com/yugabyte/yb-voyager/yb-voyager/src/callhome" "github.com/yugabyte/yb-voyager/yb-voyager/src/constants" @@ -115,6 +116,9 @@ func (pg *TargetPostgreSQL) Init() error { if err != nil { return err } + if len(pg.tconf.SessionVars) == 0 { + pg.tconf.SessionVars = getPGSessionInitScript(pg.tconf) + } schemas := strings.Split(pg.tconf.Schema, ",") schemaList := strings.Join(schemas, "','") // a','b','c checkSchemaExistsQuery := fmt.Sprintf( @@ -241,12 +245,11 @@ func (pg *TargetPostgreSQL) InitConnPool() error { pg.tconf.Parallelism = fetchDefaultParallelJobs(tconfs, PG_DEFAULT_PARALLELISM_FACTOR) log.Infof("Using %d parallel jobs by default. Use --parallel-jobs to specify a custom value", pg.tconf.Parallelism) } - params := &ConnectionParams{ NumConnections: pg.tconf.Parallelism, NumMaxConnections: pg.tconf.Parallelism, ConnUriList: targetUriList, - SessionInitScript: getYBSessionInitScript(pg.tconf), + SessionInitScript: pg.tconf.SessionVars, // works fine as we check the support of any session variable before using it in the script. // So upsert and disable transaction will never be used for PG } @@ -1054,6 +1057,10 @@ func (pg *TargetPostgreSQL) getSchemaList() []string { } func (pg *TargetPostgreSQL) GetEnabledTriggersAndFks() (enabledTriggers []string, enabledFks []string, err error) { + if slices.Contains(pg.tconf.SessionVars, SET_SESSION_REPLICATE_ROLE_TO_REPLICA) { + //Not check for any triggers / FKs in case this session parameter is used + return nil, nil, nil + } querySchemaArray := pg.getSchemaList() querySchemaList := strings.Join(querySchemaArray, ",") diff --git a/yb-voyager/src/tgtdb/tconf.go b/yb-voyager/src/tgtdb/tconf.go index 5e9f659cd0..c661cfd5ef 100644 --- a/yb-voyager/src/tgtdb/tconf.go +++ b/yb-voyager/src/tgtdb/tconf.go @@ -57,6 +57,7 @@ type TargetConf struct { Parallelism int `json:"parallelism"` EnableYBAdaptiveParallelism utils.BoolStr `json:"enable_adaptive_parallelism"` MaxParallelism int `json:"max_parallelism"` // in case adaptive parallelism is enabled. + SessionVars []string `json:"session_vars"` } func (t *TargetConf) Clone() *TargetConf { diff --git a/yb-voyager/src/tgtdb/yugabytedb.go b/yb-voyager/src/tgtdb/yugabytedb.go index 181386c9e7..e346d46695 100644 --- a/yb-voyager/src/tgtdb/yugabytedb.go +++ b/yb-voyager/src/tgtdb/yugabytedb.go @@ -119,6 +119,10 @@ func (yb *TargetYugabyteDB) Init() error { return err } + if len(yb.tconf.SessionVars) == 0 { + yb.tconf.SessionVars = getYBSessionInitScript(yb.tconf) + } + checkSchemaExistsQuery := fmt.Sprintf( "SELECT count(nspname) FROM pg_catalog.pg_namespace WHERE nspname = '%s';", yb.tconf.Schema) @@ -241,7 +245,7 @@ func (yb *TargetYugabyteDB) InitConnPool() error { NumConnections: yb.tconf.Parallelism, NumMaxConnections: yb.tconf.MaxParallelism, ConnUriList: targetUriList, - SessionInitScript: getYBSessionInitScript(yb.tconf), + SessionInitScript: yb.tconf.SessionVars, } yb.connPool = NewConnectionPool(params) redactedParams := &ConnectionParams{} @@ -996,6 +1000,17 @@ const ( ERROR_MSG_PERMISSION_DENIED = "permission denied" ) +func getPGSessionInitScript(tconf *TargetConf) []string { + var sessionVars []string + if checkSessionVariableSupport(tconf, SET_CLIENT_ENCODING_TO_UTF8) { + sessionVars = append(sessionVars, SET_CLIENT_ENCODING_TO_UTF8) + } + if checkSessionVariableSupport(tconf, SET_SESSION_REPLICATE_ROLE_TO_REPLICA) { + sessionVars = append(sessionVars, SET_SESSION_REPLICATE_ROLE_TO_REPLICA) + } + return sessionVars +} + func getYBSessionInitScript(tconf *TargetConf) []string { var sessionVars []string if checkSessionVariableSupport(tconf, SET_CLIENT_ENCODING_TO_UTF8) { @@ -1067,7 +1082,7 @@ func checkSessionVariableSupport(tconf *TargetConf, sqlStmt string) bool { if !utils.AskPrompt("Are you sure you want to proceed?") { utils.ErrExit("Aborting import.") } - return true + return false // support is not there even if the target user doesn't have privileges to set this parameter. } utils.ErrExit("error while executing sqlStatement: %q: %v", sqlStmt, err) } else {