From e2c028f701ae985b849600736ee98b0c8c2e0222 Mon Sep 17 00:00:00 2001 From: Sanyam Singhal Date: Tue, 23 Jan 2024 17:51:05 +0530 Subject: [PATCH] Removing log line for cleaning child processes in case there is no child process e.g. version cmd --- migtests/lib/yb.py | 5 + migtests/tests/pg/sequences/cleanup-db | 0 migtests/tests/pg/sequences/env.sh | 2 +- .../pg/sequences/pg_sequences_automation.sql | 43 +++++++- migtests/tests/pg/sequences/validate | 102 +++++++++++++----- yb-voyager/cmd/common.go | 4 +- yb-voyager/src/srcdb/postgres.go | 30 +++--- 7 files changed, 145 insertions(+), 41 deletions(-) mode change 100644 => 100755 migtests/tests/pg/sequences/cleanup-db diff --git a/migtests/lib/yb.py b/migtests/lib/yb.py index 96d7112194..7e01a7540a 100644 --- a/migtests/lib/yb.py +++ b/migtests/lib/yb.py @@ -61,6 +61,11 @@ def table_exists(self, table_name, table_schema="public") -> bool: result = cur.fetchone() return result[0] + def count_tables(self, schema="public") -> int: + cur = self.conn.cursor() + cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=%s AND table_type='BASE TABLE'", (schema,)) + return cur.fetchone()[0] + def get_table_names(self, schema="public") -> List[str]: cur = self.conn.cursor() q = "SELECT table_name FROM information_schema.tables WHERE table_schema=%s AND table_type='BASE TABLE'" diff --git a/migtests/tests/pg/sequences/cleanup-db b/migtests/tests/pg/sequences/cleanup-db old mode 100644 new mode 100755 diff --git a/migtests/tests/pg/sequences/env.sh b/migtests/tests/pg/sequences/env.sh index 9ab68107ad..5580e3c7f6 100644 --- a/migtests/tests/pg/sequences/env.sh +++ b/migtests/tests/pg/sequences/env.sh @@ -1,3 +1,3 @@ export SOURCE_DB_TYPE="postgresql" export SOURCE_DB_NAME=${SOURCE_DB_NAME:-"pg_sequences"} -export SOURCE_DB_SCHEMA="public,schema1" +export SOURCE_DB_SCHEMA="public,schema1,schema2,schema3,schema4" diff --git a/migtests/tests/pg/sequences/pg_sequences_automation.sql b/migtests/tests/pg/sequences/pg_sequences_automation.sql index 4830c83ed5..2e4745f9f3 100644 --- a/migtests/tests/pg/sequences/pg_sequences_automation.sql +++ b/migtests/tests/pg/sequences/pg_sequences_automation.sql @@ -142,4 +142,45 @@ create table schema1.multiple_serial_columns ( ); insert into schema1.multiple_serial_columns(name, balance, name2, balance2) values('def', 10000, 'def', 10000); -insert into schema1.multiple_serial_columns(name, balance, name2, balance2) values('abc', 10000, 'abc', 10000); \ No newline at end of file +insert into schema1.multiple_serial_columns(name, balance, name2, balance2) values('abc', 10000, 'abc', 10000); + +-- Single Sequence attached to two columns of different table +create sequence baz; +create table foo(id bigint default nextval('baz'), value text); +create table bar(id bigint default nextval('baz'), value date); + +insert into foo (value) values ('Hello'); +insert into bar (value) values (now()); +insert into foo (value) values ('World'); +insert into bar (value) values (now()); + +-- Single Sequence attached to two columns of different table in non-public schema +create schema schema2; +create sequence schema2.baz; +create table schema2.foo(id bigint default nextval('schema2.baz'), value text); +create table schema2.bar(id bigint default nextval('schema2.baz'), value date); + +insert into schema2.foo (value) values ('Hello'); +insert into schema2.bar (value) values (now()); +insert into schema2.foo (value) values ('World'); +insert into schema2.bar (value) values (now()); + + +-- Single Sequence attached to two columns of different table in different schemas +create schema schema3; +create schema schema4; +create sequence schema3.baz; +create table schema3.foo(id bigint default nextval('schema3.baz'), value text); +create table schema4.bar(id bigint default nextval('schema3.baz'), value date); + +insert into schema3.foo (value) values ('Hello'); +insert into schema4.bar (value) values (now()); +insert into schema3.foo (value) values ('World'); +insert into schema4.bar (value) values (now()); + +-- Single Sequence attached to two columns of a table +create sequence foo_bar_baz; +create table foo_bar(id bigint default nextval('foo_bar_baz'), value text, id2 bigint default nextval('foo_bar_baz'), value2 text); + +insert into foo_bar (value, value2) values ('Hello', 'World'); +insert into foo_bar (value, value2) values ('World', 'Hello'); diff --git a/migtests/tests/pg/sequences/validate b/migtests/tests/pg/sequences/validate index e7c53a02c5..120c035113 100755 --- a/migtests/tests/pg/sequences/validate +++ b/migtests/tests/pg/sequences/validate @@ -15,6 +15,9 @@ EXPECTED_ROW_COUNT['public'] = { 'sequence_check3': 3, 'multiple_identity_columns': 2, 'multiple_serial_columns': 2, + 'foo': 2, + 'bar': 2, + 'foo_bar': 2, } EXPECTED_ROW_COUNT['schema1'] = { @@ -25,6 +28,19 @@ EXPECTED_ROW_COUNT['schema1'] = { 'multiple_serial_columns': 2, } +EXPECTED_ROW_COUNT['schema2'] = { + 'foo': 2, + 'bar': 2, +} + +EXPECTED_ROW_COUNT['schema3'] = { + 'foo': 2, +} + +EXPECTED_ROW_COUNT['schema4'] = { + 'bar': 2, +} + EXPECTED_TABLE_SUM = {} EXPECTED_TABLE_SUM['public'] = { 'sequence_check1': 3, @@ -32,6 +48,9 @@ EXPECTED_TABLE_SUM['public'] = { 'sequence_check3': 6, 'multiple_identity_columns': 3, 'multiple_serial_columns': 3, + 'foo': 4, + 'bar': 6, + 'foo_bar': 4, # two columns with same sequence used } EXPECTED_TABLE_SUM['schema1'] = { @@ -42,11 +61,44 @@ EXPECTED_TABLE_SUM['schema1'] = { 'multiple_serial_columns': 3, } +EXPECTED_TABLE_SUM['schema2'] = { + 'foo': 4, + 'bar': 6, +} + +EXPECTED_TABLE_SUM['schema3'] = { + 'foo': 4, +} + +EXPECTED_TABLE_SUM['schema4'] = { + 'bar': 6, +} + +NUM_TABLES = { + 'public': 8, + 'schema1': 5, + 'schema2': 2, + 'schema3': 1, + 'schema4': 1 +} + +NUM_SEQUENCES = { + 'public': 4+2, # 2 are implicit because of serial/generated columns + 'schema1': 1+3, # 3 are implicit because of serial/generated columns + 'schema2': 1, + 'schema3': 1, + 'schema4': 0 +} + def migration_completed_checks_per_schema(tgt, schema): print("check for schema: ", schema) - table_list = tgt.get_table_names(schema) - print("table_list: ", table_list) - assert len(table_list) == 5 + table_list = tgt.count_tables(schema) + print(f"actual table_list: {table_list}, expected num_tables: {NUM_TABLES[schema]}") + assert table_list == NUM_TABLES[schema] + + num_sequences = tgt.count_sequences(schema) + print(f"actual number_sequences: {num_sequences}, expected num_sequences: {NUM_SEQUENCES[schema]}") + assert num_sequences == NUM_SEQUENCES[schema] got_row_count = tgt.row_count_of_all_tables(schema) for table_name, row_count in EXPECTED_ROW_COUNT[schema].items(): @@ -57,30 +109,14 @@ def migration_completed_checks_per_schema(tgt, schema): got_sum_column_values = tgt.get_sum_of_column_of_table(table_name, "id", schema) print(f"table_name: {table_name}, got_sum_column_values: {got_sum_column_values}, expected_sum_column_values: {sum_column_values}") assert sum_column_values == got_sum_column_values - - num_sequences = tgt.count_sequences(schema) - print(f"Number of Sequences {num_sequences}") - assert num_sequences == 4 - - INSERT_SEQUENCE_QUERY = f"insert into {schema}.sequence_check3 (name) values ('Yugabyte');" - insert_query_chk_error = tgt.run_query_and_chk_error(INSERT_SEQUENCE_QUERY, None) - print(f"insert query returned for {schema}.sequence_check3 - {insert_query_chk_error}") - assert insert_query_chk_error == False - - - SELECT_ID_QUERY = f"select id from {schema}.sequence_check3 where name = 'Yugabyte';" - id_returned = tgt.execute_query(SELECT_ID_QUERY) - expected_id = EXPECTED_ROW_COUNT[schema]['sequence_check3'] + 1 - print(f"for sequence_check3, Id returned- {id_returned} and expected id - {expected_id}") - assert id_returned == expected_id # this validation check has been as added for issue - https://github.com/yugabyte/yb-voyager/issues/632 SEQUENCE_NAMES = ["sequence_check1_id_seq", "sequence_check2_id_seq", "sequence_check3_id_seq", "schema1.sequence_check1_id_seq", - "schema1.sequence_check2_id_seq", "schema1.sequence_check3_id_seq", "multiple_identity_columns_id_seq", - "multiple_serial_columns_id_seq", "schema1.multiple_identity_columns_id_seq", "schema1.multiple_serial_columns_id_seq"] + "schema1.sequence_check2_id_seq", "schema1.sequence_check3_id_seq", "multiple_identity_columns_id_seq", + "multiple_serial_columns_id_seq", "schema1.multiple_identity_columns_id_seq", "schema1.multiple_serial_columns_id_seq"] SEQUENCE_OWNER_COLUMNS = ["sequence_check1.id", "sequence_check2.id", "sequence_check3.id", "schema1.sequence_check1.id", - "schema1.sequence_check2.id", "schema1.sequence_check3.id", "multiple_identity_columns.id", - "multiple_serial_columns.id", "schema1.multiple_identity_columns.id", "schema1.multiple_serial_columns.id"] + "schema1.sequence_check2.id", "schema1.sequence_check3.id", "multiple_identity_columns.id", + "multiple_serial_columns.id", "schema1.multiple_identity_columns.id", "schema1.multiple_serial_columns.id"] for i in range(len(SEQUENCE_NAMES)): FETCH_SEQUENCE_OWNER_QUERY = f"""SELECT CONCAT(d.refobjid::regclass, '.', a.attname) AS owner_column @@ -95,10 +131,28 @@ def migration_completed_checks_per_schema(tgt, schema): print(f"fetched owner column of sequence {SEQUENCE_NAMES[i]} is: {SEQUENCE_OWNER_COLUMNS[i]}, expected owner: {FETCHED_SEQUENCE_OWNER_COLUMN}") assert FETCHED_SEQUENCE_OWNER_COLUMN == SEQUENCE_OWNER_COLUMNS[i] +def migration_completed_checks_for_sequences_per_schema(tgt, schema): + INSERT_SEQUENCE_QUERY = f"insert into {schema}.sequence_check3 (name) values ('Yugabyte');" + insert_query_chk_error = tgt.run_query_and_chk_error(INSERT_SEQUENCE_QUERY, None) + print(f"insert query returned for {schema}.sequence_check3 - {insert_query_chk_error}") + assert insert_query_chk_error == False + + + SELECT_ID_QUERY = f"select id from {schema}.sequence_check3 where name = 'Yugabyte';" + id_returned = tgt.execute_query(SELECT_ID_QUERY) + expected_id = EXPECTED_ROW_COUNT[schema]['sequence_check3'] + 1 + print(f"for sequence_check3, Id returned- {id_returned} and expected id - {expected_id}") + assert id_returned == expected_id + def migration_completed_checks(tgt): migration_completed_checks_per_schema(tgt, 'public') migration_completed_checks_per_schema(tgt, 'schema1') - + migration_completed_checks_per_schema(tgt, 'schema2') + migration_completed_checks_per_schema(tgt, 'schema3') + migration_completed_checks_per_schema(tgt, 'schema4') + + migration_completed_checks_for_sequences_per_schema(tgt, 'public') + migration_completed_checks_for_sequences_per_schema(tgt, 'schema1') if __name__ == "__main__": main() \ No newline at end of file diff --git a/yb-voyager/cmd/common.go b/yb-voyager/cmd/common.go index fa266f06e9..ee8d38952d 100644 --- a/yb-voyager/cmd/common.go +++ b/yb-voyager/cmd/common.go @@ -529,7 +529,6 @@ func getDefaultPGSchema(schema string) (string, bool) { } func CleanupChildProcesses() { - log.Info("Cleaning up child processes...") myPid := os.Getpid() processes, err := ps.Processes() if err != nil { @@ -538,6 +537,9 @@ func CleanupChildProcesses() { childProcesses := lo.Filter(processes, func(process ps.Process, _ int) bool { return process.PPid() == myPid }) + if len(childProcesses) != 0 { + log.Info("Cleaning up child processes...") + } for _, process := range childProcesses { pid := process.Pid() log.Infof("shutting down child pid %d", pid) diff --git a/yb-voyager/src/srcdb/postgres.go b/yb-voyager/src/srcdb/postgres.go index e2e8455acc..fe79369d24 100644 --- a/yb-voyager/src/srcdb/postgres.go +++ b/yb-voyager/src/srcdb/postgres.go @@ -397,22 +397,24 @@ func (pg *PostgreSQL) GetColumnToSequenceMap(tableList []*sqlname.SourceName) ma for _, table := range tableList { // query to find out column name vs sequence name for a table // this query also covers the case of identity columns - query := fmt.Sprintf(`SELECT a.attname AS column_name, s.relname AS sequence_name, - s.relnamespace::pg_catalog.regnamespace::text AS schema_name + query := fmt.Sprintf(`SELECT + a.attname AS column_name, + COALESCE(seq.relname, '') AS sequence_name, + COALESCE(ns.nspname, '') AS schema_name FROM pg_class AS t - JOIN pg_attribute AS a - ON a.attrelid = t.oid - JOIN pg_depend AS d - ON d.refobjid = t.oid - AND d.refobjsubid = a.attnum - JOIN pg_class AS s - ON s.oid = d.objid - WHERE d.classid = 'pg_catalog.pg_class'::regclass - AND d.refclassid = 'pg_catalog.pg_class'::regclass - AND d.deptype IN ('i', 'a') + JOIN pg_attribute AS a ON a.attrelid = t.oid + JOIN pg_namespace AS tn ON tn.oid = t.relnamespace + LEFT JOIN pg_attrdef AS ad ON ad.adrelid = t.oid AND ad.adnum = a.attnum + LEFT JOIN pg_depend AS d ON d.objid = ad.oid + LEFT JOIN pg_class AS seq ON seq.oid = d.refobjid + LEFT JOIN pg_namespace AS ns ON ns.oid = seq.relnamespace + WHERE + tn.nspname = '%s' -- schema name + AND t.relname = '%s' -- table name + AND a.attnum > 0 + AND NOT a.attisdropped AND t.relkind IN ('r', 'P') - AND s.relkind = 'S' - AND t.oid = '%s'::regclass;`, table.Qualified.MinQuoted) + AND seq.relkind = 'S';`, table.SchemaName.Unquoted, table.ObjectName.Unquoted) var columeName, sequenceName, schemaName string rows, err := pg.db.Query(context.Background(), query)