Skip to content

Commit

Permalink
Removing log line for cleaning child processes in case there is no ch…
Browse files Browse the repository at this point in the history
…ild process e.g. version cmd
  • Loading branch information
sanyamsinghal authored and [email protected] committed Jan 25, 2024
1 parent dd0cecd commit e2c028f
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 41 deletions.
5 changes: 5 additions & 0 deletions migtests/lib/yb.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def table_exists(self, table_name, table_schema="public") -> bool:
result = cur.fetchone()
return result[0]

def count_tables(self, schema="public") -> int:
cur = self.conn.cursor()
cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=%s AND table_type='BASE TABLE'", (schema,))
return cur.fetchone()[0]

def get_table_names(self, schema="public") -> List[str]:
cur = self.conn.cursor()
q = "SELECT table_name FROM information_schema.tables WHERE table_schema=%s AND table_type='BASE TABLE'"
Expand Down
Empty file modified migtests/tests/pg/sequences/cleanup-db
100644 → 100755
Empty file.
2 changes: 1 addition & 1 deletion migtests/tests/pg/sequences/env.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export SOURCE_DB_TYPE="postgresql"
export SOURCE_DB_NAME=${SOURCE_DB_NAME:-"pg_sequences"}
export SOURCE_DB_SCHEMA="public,schema1"
export SOURCE_DB_SCHEMA="public,schema1,schema2,schema3,schema4"
43 changes: 42 additions & 1 deletion migtests/tests/pg/sequences/pg_sequences_automation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,45 @@ create table schema1.multiple_serial_columns (
);

insert into schema1.multiple_serial_columns(name, balance, name2, balance2) values('def', 10000, 'def', 10000);
insert into schema1.multiple_serial_columns(name, balance, name2, balance2) values('abc', 10000, 'abc', 10000);
insert into schema1.multiple_serial_columns(name, balance, name2, balance2) values('abc', 10000, 'abc', 10000);

-- Single Sequence attached to two columns of different table
create sequence baz;
create table foo(id bigint default nextval('baz'), value text);
create table bar(id bigint default nextval('baz'), value date);

insert into foo (value) values ('Hello');
insert into bar (value) values (now());
insert into foo (value) values ('World');
insert into bar (value) values (now());

-- Single Sequence attached to two columns of different table in non-public schema
create schema schema2;
create sequence schema2.baz;
create table schema2.foo(id bigint default nextval('schema2.baz'), value text);
create table schema2.bar(id bigint default nextval('schema2.baz'), value date);

insert into schema2.foo (value) values ('Hello');
insert into schema2.bar (value) values (now());
insert into schema2.foo (value) values ('World');
insert into schema2.bar (value) values (now());


-- Single Sequence attached to two columns of different table in different schemas
create schema schema3;
create schema schema4;
create sequence schema3.baz;
create table schema3.foo(id bigint default nextval('schema3.baz'), value text);
create table schema4.bar(id bigint default nextval('schema3.baz'), value date);

insert into schema3.foo (value) values ('Hello');
insert into schema4.bar (value) values (now());
insert into schema3.foo (value) values ('World');
insert into schema4.bar (value) values (now());

-- Single Sequence attached to two columns of a table
create sequence foo_bar_baz;
create table foo_bar(id bigint default nextval('foo_bar_baz'), value text, id2 bigint default nextval('foo_bar_baz'), value2 text);

insert into foo_bar (value, value2) values ('Hello', 'World');
insert into foo_bar (value, value2) values ('World', 'Hello');
102 changes: 78 additions & 24 deletions migtests/tests/pg/sequences/validate
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ EXPECTED_ROW_COUNT['public'] = {
'sequence_check3': 3,
'multiple_identity_columns': 2,
'multiple_serial_columns': 2,
'foo': 2,
'bar': 2,
'foo_bar': 2,
}

EXPECTED_ROW_COUNT['schema1'] = {
Expand All @@ -25,13 +28,29 @@ EXPECTED_ROW_COUNT['schema1'] = {
'multiple_serial_columns': 2,
}

EXPECTED_ROW_COUNT['schema2'] = {
'foo': 2,
'bar': 2,
}

EXPECTED_ROW_COUNT['schema3'] = {
'foo': 2,
}

EXPECTED_ROW_COUNT['schema4'] = {
'bar': 2,
}

EXPECTED_TABLE_SUM = {}
EXPECTED_TABLE_SUM['public'] = {
'sequence_check1': 3,
'sequence_check2': 28,
'sequence_check3': 6,
'multiple_identity_columns': 3,
'multiple_serial_columns': 3,
'foo': 4,
'bar': 6,
'foo_bar': 4, # two columns with same sequence used
}

EXPECTED_TABLE_SUM['schema1'] = {
Expand All @@ -42,11 +61,44 @@ EXPECTED_TABLE_SUM['schema1'] = {
'multiple_serial_columns': 3,
}

EXPECTED_TABLE_SUM['schema2'] = {
'foo': 4,
'bar': 6,
}

EXPECTED_TABLE_SUM['schema3'] = {
'foo': 4,
}

EXPECTED_TABLE_SUM['schema4'] = {
'bar': 6,
}

NUM_TABLES = {
'public': 8,
'schema1': 5,
'schema2': 2,
'schema3': 1,
'schema4': 1
}

NUM_SEQUENCES = {
'public': 4+2, # 2 are implicit because of serial/generated columns
'schema1': 1+3, # 3 are implicit because of serial/generated columns
'schema2': 1,
'schema3': 1,
'schema4': 0
}

def migration_completed_checks_per_schema(tgt, schema):
print("check for schema: ", schema)
table_list = tgt.get_table_names(schema)
print("table_list: ", table_list)
assert len(table_list) == 5
table_list = tgt.count_tables(schema)
print(f"actual table_list: {table_list}, expected num_tables: {NUM_TABLES[schema]}")
assert table_list == NUM_TABLES[schema]

num_sequences = tgt.count_sequences(schema)
print(f"actual number_sequences: {num_sequences}, expected num_sequences: {NUM_SEQUENCES[schema]}")
assert num_sequences == NUM_SEQUENCES[schema]

got_row_count = tgt.row_count_of_all_tables(schema)
for table_name, row_count in EXPECTED_ROW_COUNT[schema].items():
Expand All @@ -57,30 +109,14 @@ def migration_completed_checks_per_schema(tgt, schema):
got_sum_column_values = tgt.get_sum_of_column_of_table(table_name, "id", schema)
print(f"table_name: {table_name}, got_sum_column_values: {got_sum_column_values}, expected_sum_column_values: {sum_column_values}")
assert sum_column_values == got_sum_column_values

num_sequences = tgt.count_sequences(schema)
print(f"Number of Sequences {num_sequences}")
assert num_sequences == 4

INSERT_SEQUENCE_QUERY = f"insert into {schema}.sequence_check3 (name) values ('Yugabyte');"
insert_query_chk_error = tgt.run_query_and_chk_error(INSERT_SEQUENCE_QUERY, None)
print(f"insert query returned for {schema}.sequence_check3 - {insert_query_chk_error}")
assert insert_query_chk_error == False


SELECT_ID_QUERY = f"select id from {schema}.sequence_check3 where name = 'Yugabyte';"
id_returned = tgt.execute_query(SELECT_ID_QUERY)
expected_id = EXPECTED_ROW_COUNT[schema]['sequence_check3'] + 1
print(f"for sequence_check3, Id returned- {id_returned} and expected id - {expected_id}")
assert id_returned == expected_id

# this validation check has been as added for issue - https://github.com/yugabyte/yb-voyager/issues/632
SEQUENCE_NAMES = ["sequence_check1_id_seq", "sequence_check2_id_seq", "sequence_check3_id_seq", "schema1.sequence_check1_id_seq",
"schema1.sequence_check2_id_seq", "schema1.sequence_check3_id_seq", "multiple_identity_columns_id_seq",
"multiple_serial_columns_id_seq", "schema1.multiple_identity_columns_id_seq", "schema1.multiple_serial_columns_id_seq"]
"schema1.sequence_check2_id_seq", "schema1.sequence_check3_id_seq", "multiple_identity_columns_id_seq",
"multiple_serial_columns_id_seq", "schema1.multiple_identity_columns_id_seq", "schema1.multiple_serial_columns_id_seq"]
SEQUENCE_OWNER_COLUMNS = ["sequence_check1.id", "sequence_check2.id", "sequence_check3.id", "schema1.sequence_check1.id",
"schema1.sequence_check2.id", "schema1.sequence_check3.id", "multiple_identity_columns.id",
"multiple_serial_columns.id", "schema1.multiple_identity_columns.id", "schema1.multiple_serial_columns.id"]
"schema1.sequence_check2.id", "schema1.sequence_check3.id", "multiple_identity_columns.id",
"multiple_serial_columns.id", "schema1.multiple_identity_columns.id", "schema1.multiple_serial_columns.id"]

for i in range(len(SEQUENCE_NAMES)):
FETCH_SEQUENCE_OWNER_QUERY = f"""SELECT CONCAT(d.refobjid::regclass, '.', a.attname) AS owner_column
Expand All @@ -95,10 +131,28 @@ def migration_completed_checks_per_schema(tgt, schema):
print(f"fetched owner column of sequence {SEQUENCE_NAMES[i]} is: {SEQUENCE_OWNER_COLUMNS[i]}, expected owner: {FETCHED_SEQUENCE_OWNER_COLUMN}")
assert FETCHED_SEQUENCE_OWNER_COLUMN == SEQUENCE_OWNER_COLUMNS[i]

def migration_completed_checks_for_sequences_per_schema(tgt, schema):
INSERT_SEQUENCE_QUERY = f"insert into {schema}.sequence_check3 (name) values ('Yugabyte');"
insert_query_chk_error = tgt.run_query_and_chk_error(INSERT_SEQUENCE_QUERY, None)
print(f"insert query returned for {schema}.sequence_check3 - {insert_query_chk_error}")
assert insert_query_chk_error == False


SELECT_ID_QUERY = f"select id from {schema}.sequence_check3 where name = 'Yugabyte';"
id_returned = tgt.execute_query(SELECT_ID_QUERY)
expected_id = EXPECTED_ROW_COUNT[schema]['sequence_check3'] + 1
print(f"for sequence_check3, Id returned- {id_returned} and expected id - {expected_id}")
assert id_returned == expected_id

def migration_completed_checks(tgt):
migration_completed_checks_per_schema(tgt, 'public')
migration_completed_checks_per_schema(tgt, 'schema1')

migration_completed_checks_per_schema(tgt, 'schema2')
migration_completed_checks_per_schema(tgt, 'schema3')
migration_completed_checks_per_schema(tgt, 'schema4')

migration_completed_checks_for_sequences_per_schema(tgt, 'public')
migration_completed_checks_for_sequences_per_schema(tgt, 'schema1')

if __name__ == "__main__":
main()
4 changes: 3 additions & 1 deletion yb-voyager/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,6 @@ func getDefaultPGSchema(schema string) (string, bool) {
}

func CleanupChildProcesses() {
log.Info("Cleaning up child processes...")
myPid := os.Getpid()
processes, err := ps.Processes()
if err != nil {
Expand All @@ -538,6 +537,9 @@ func CleanupChildProcesses() {
childProcesses := lo.Filter(processes, func(process ps.Process, _ int) bool {
return process.PPid() == myPid
})
if len(childProcesses) != 0 {
log.Info("Cleaning up child processes...")
}
for _, process := range childProcesses {
pid := process.Pid()
log.Infof("shutting down child pid %d", pid)
Expand Down
30 changes: 16 additions & 14 deletions yb-voyager/src/srcdb/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,22 +397,24 @@ func (pg *PostgreSQL) GetColumnToSequenceMap(tableList []*sqlname.SourceName) ma
for _, table := range tableList {
// query to find out column name vs sequence name for a table
// this query also covers the case of identity columns
query := fmt.Sprintf(`SELECT a.attname AS column_name, s.relname AS sequence_name,
s.relnamespace::pg_catalog.regnamespace::text AS schema_name
query := fmt.Sprintf(`SELECT
a.attname AS column_name,
COALESCE(seq.relname, '') AS sequence_name,
COALESCE(ns.nspname, '') AS schema_name
FROM pg_class AS t
JOIN pg_attribute AS a
ON a.attrelid = t.oid
JOIN pg_depend AS d
ON d.refobjid = t.oid
AND d.refobjsubid = a.attnum
JOIN pg_class AS s
ON s.oid = d.objid
WHERE d.classid = 'pg_catalog.pg_class'::regclass
AND d.refclassid = 'pg_catalog.pg_class'::regclass
AND d.deptype IN ('i', 'a')
JOIN pg_attribute AS a ON a.attrelid = t.oid
JOIN pg_namespace AS tn ON tn.oid = t.relnamespace
LEFT JOIN pg_attrdef AS ad ON ad.adrelid = t.oid AND ad.adnum = a.attnum
LEFT JOIN pg_depend AS d ON d.objid = ad.oid
LEFT JOIN pg_class AS seq ON seq.oid = d.refobjid
LEFT JOIN pg_namespace AS ns ON ns.oid = seq.relnamespace
WHERE
tn.nspname = '%s' -- schema name
AND t.relname = '%s' -- table name
AND a.attnum > 0
AND NOT a.attisdropped
AND t.relkind IN ('r', 'P')
AND s.relkind = 'S'
AND t.oid = '%s'::regclass;`, table.Qualified.MinQuoted)
AND seq.relkind = 'S';`, table.SchemaName.Unquoted, table.ObjectName.Unquoted)

var columeName, sequenceName, schemaName string
rows, err := pg.db.Query(context.Background(), query)
Expand Down

0 comments on commit e2c028f

Please sign in to comment.