diff --git a/alembic.ini b/alembic.ini index c727608..aeaf34a 100644 --- a/alembic.ini +++ b/alembic.ini @@ -52,8 +52,11 @@ sqlalchemy.url = postgresql://crmp@dbtest01.pcic.uvic.ca/crmp?keepalives=1&keepa [crmp_dbtest02_hx] sqlalchemy.url = postgresql://crmp@dbtest02.pcic.uvic.ca:5432/crmp_hx?keepalives=1&keepalives_idle=300&keepalives_interval=300&keepalives_count=9 -[crmp_prod] -sqlalchemy.url = postgresql://crmp@db.pcic.uvic.ca/crmp?keepalives=1&keepalives_idle=300&keepalives_interval=300&keepalives_count=9 +[CRMP_PROD] +sqlalchemy.url = postgresql://crmp@db.pcic.uvic.ca:5433/crmp?keepalives=1&keepalives_idle=300&keepalives_interval=300&keepalives_count=9 + +[CRMP_PROD_PCICDB03] +sqlalchemy.url = postgresql://crmp@pcicdb03.pcic.uvic.ca:5433/crmp?keepalives=1&keepalives_idle=300&keepalives_interval=300&keepalives_count=9 ;[test] ;sqlalchemy.url = ... diff --git a/pycds/alembic/change_history_utils.py b/pycds/alembic/change_history_utils.py index ed96449..05249b2 100644 --- a/pycds/alembic/change_history_utils.py +++ b/pycds/alembic/change_history_utils.py @@ -19,18 +19,24 @@ def qualified_name(name: str, schema=schema_name) -> str: return f"{prefix}{name}" -def pri_table_name(collection_name: str, schema=schema_name) -> str: +def main_table_name(collection_name: str, schema=schema_name) -> str: return qualified_name(collection_name, schema=schema) def hx_table_name(collection_name: str, **kwargs) -> str: - return f"{pri_table_name(collection_name, **kwargs)}_hx" + return f"{main_table_name(collection_name, **kwargs)}_hx" def hx_id_name(collection_name: str, **kwargs): return f"{collection_name}_hx_id" +def hx_id_seq_name(collection_name: str, **kwargs): + """Return the name of the sequence that generates the primary id of the history table. + This name is the default name generated by Postgres.""" + return f"{hx_table_name(collection_name)}_{hx_id_name(collection_name)}_seq" + + def sql_array(a: Iterable[Any]) -> str: return f"{{{', '.join(a)}}}" @@ -46,30 +52,30 @@ def add_history_cols_to_primary( # op.add_column can add only one column at a time. # Tables can be large, so for efficiency we add all columns in one command. add_columns = ", ".join(f"ADD COLUMN {c}" for c in columns) - op.execute(f"ALTER TABLE {pri_table_name(collection_name)} {add_columns}") + op.execute(f"ALTER TABLE {main_table_name(collection_name)} {add_columns}") def drop_history_cols_from_primary( collection_name: str, columns: tuple[str] = ("mod_time", "mod_user") ): drop_columns = ", ".join(f"DROP COLUMN {c}" for c in columns) - op.execute(f"ALTER TABLE {pri_table_name(collection_name)} {drop_columns}") + op.execute(f"ALTER TABLE {main_table_name(collection_name)} {drop_columns}") -def create_history_table(collection_name: str, foreign_keys: list[tuple[str, str]]): +def create_history_table(collection_name: str, foreign_tables: list[tuple[str, str]]): # Create the history table. We can't use Alembic create_table here because it doesn't # support the LIKE syntax we need. columns = ", ".join( ( - f" LIKE {pri_table_name(collection_name)} INCLUDING ALL EXCLUDING INDEXES", + f" LIKE {main_table_name(collection_name)} INCLUDING ALL EXCLUDING INDEXES", f" deleted boolean DEFAULT false", f" {hx_id_name(collection_name)} int " f" PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY", ) + tuple( - f"{hx_id_name(fk_table_name)} int " - f" REFERENCES {hx_table_name(fk_table_name)}({hx_id_name(fk_table_name)})" - for fk_table_name, _ in (foreign_keys or tuple()) + f"{hx_id_name(ft_table_name)} int " + f" REFERENCES {hx_table_name(ft_table_name)}({hx_id_name(ft_table_name)})" + for ft_table_name, _ in (foreign_tables or tuple()) ) ) op.execute(f"CREATE TABLE {hx_table_name(collection_name)} ({columns})") @@ -83,7 +89,7 @@ def drop_history_table(collection_name: str): def create_history_table_indexes( collection_name: str, pri_id_name: str, - foreign_keys: list[tuple[str, str]], + foreign_tables: list[tuple[str, str]], extras=None, ): """ @@ -94,10 +100,12 @@ def create_history_table_indexes( for columns in ( # Index on primary table primary key, mod_time, mod_user ([pri_id_name], ["mod_time"], ["mod_user"]) - # Index on all history foreign keys + # Index on all foreign main table primary keys + + tuple([ft_pk_name] for _, ft_pk_name in (foreign_tables or tuple())) + # Index on all foreign history table primary keys + tuple( - [hx_id_name(fk_table_name)] - for fk_table_name, _ in (foreign_keys or tuple()) + [hx_id_name(ft_table_name)] + for ft_table_name, _ in (foreign_tables or tuple()) ) + (extras or tuple()) ): @@ -111,58 +119,72 @@ def create_history_table_indexes( ) -def populate_history_table(collection_name: str, pri_id_name: str, limit: int = None): - # Populate the history table with data from the primary table, in order of primary id. - # That ordering guarantees that the newly generated history id's will be in the - # same order, which is required for it to be a valid history table. - op.execute( - f"INSERT INTO {hx_table_name(collection_name)} " - f"SELECT * " - f"FROM {pri_table_name(collection_name)} " - f"ORDER BY {pri_table_name(collection_name)}.{pri_id_name} " - f"LIMIT {limit or 'NULL'}" - ) - - -def update_obs_raw_history_FKs(suspend_synchronous_commit: bool = False): +def populate_history_table( + collection_name: str, + pri_id_name: str, + foreign_tables: list[tuple[str, str]], + limit: int = None, +): """ - Update the history FKs in obs_raw, in bulk. - - This method would be easy to generalize to other tables with different FK - collections, but at the time of writing, only obs_raw needs bulk FK updates, and we - already have the query in hand. + Populate the history table with data from the main table, in order of item id (main + table primary key). + That ordering guarantees that the newly generated history id's will be in the + same order, which is required for it to be a valid history table. + We include the history FKs in the initial population, because to do it any other + way proves infeasible for large tables (obs_raw) in memory and/or time usage. """ - synchronous_commit = op.get_bind().execute("show synchronous_commit").scalar() - print("## synchronous_commit", synchronous_commit) - if suspend_synchronous_commit: - synchronous_commit = op.get_bind().execute("show synchronous_commit").scalar() - print("## synchronous_commit", synchronous_commit) - op.execute("SET synchronous_commit = off") + # Foreign tables are used in common table expressions (CTEs) that provide the latest + # foreign table history id's. A series of related objects are generated from the + # foreign table definitions: the CTE names, the CTE definitions, and their usages + # within the query that populates the target history table. - # TODO: Rewrite as SA query? - op.execute( - """ - WITH v as ( - SELECT vars_id, max(meta_vars_hx_id) latest - FROM meta_vars_hx - GROUP BY vars_id - ), - h as ( - SELECT history_id, max(meta_history_hx_id) latest - FROM meta_history_hx - GROUP BY history_id - ) - UPDATE obs_raw_hx o - SET meta_vars_hx_id = v.latest, meta_history_hx_id = h.latest - FROM v, h - WHERE o.vars_id = v.vars_id - AND o.history_id = h.history_id + foreign_tables = foreign_tables or tuple() + + conditional_comma = "," if len(foreign_tables) > 0 else "" + + ft_cte_names = tuple( + f"{ft_table_name}_latest" for ft_table_name, ft_pk_name in foreign_tables + ) + ft_cte_name_list = ", ".join(ft_cte_names) + + ft_cte_defns = tuple( + f""" + {cte_name} AS ( + SELECT {ft_pk_name}, max({hx_id_name(ft_table_name)}) val + FROM {hx_table_name(ft_table_name)} + GROUP BY {ft_pk_name} + ) """ + for cte_name, (ft_table_name, ft_pk_name) in zip(ft_cte_names, foreign_tables) ) + ft_cte_list = ", ".join(ft_cte_defns) + + ft_cte_value_list = ", ".join(f"{cte_name}.val" for cte_name in ft_cte_names) - if suspend_synchronous_commit: - op.execute(f"SET synchronous_commit = {synchronous_commit}") + ft_where_conditions = tuple( + f"main.{ft_pk_name} = {cte_name}.{ft_pk_name}" + for cte_name, (ft_table_name, ft_pk_name) in zip(ft_cte_names, foreign_tables) + ) + ft_where_clause = ( + f"WHERE {' AND '.join(ft_where_conditions)}" + if len(ft_where_conditions) > 0 + else "" + ) + + stmt = f""" + {"WITH" if len(foreign_tables) > 0 else ""} + {ft_cte_list} + INSERT INTO {hx_table_name(collection_name)} + SELECT main.*, false, + nextval('{hx_id_seq_name(collection_name)}'::regclass) + {conditional_comma} {ft_cte_value_list} + FROM {main_table_name(collection_name)} main + {conditional_comma} {ft_cte_name_list} + {ft_where_clause} + ORDER BY main.{pri_id_name} + """ + op.execute(stmt) def create_primary_table_triggers(collection_name: str, prefix: str = "t100_"): @@ -170,7 +192,7 @@ def create_primary_table_triggers(collection_name: str, prefix: str = "t100_"): op.execute( f"CREATE TRIGGER {prefix}primary_control_hx_cols " f" BEFORE INSERT OR DELETE OR UPDATE " - f" ON {pri_table_name(collection_name)} " + f" ON {main_table_name(collection_name)} " f" FOR EACH ROW " f" EXECUTE FUNCTION {qualified_name('hxtk_primary_control_hx_cols')}()" ) @@ -179,19 +201,19 @@ def create_primary_table_triggers(collection_name: str, prefix: str = "t100_"): op.execute( f"CREATE TRIGGER {prefix}primary_ops_to_hx " f" AFTER INSERT OR DELETE OR UPDATE " - f" ON {pri_table_name(collection_name)} " + f" ON {main_table_name(collection_name)} " f" FOR EACH ROW " f" EXECUTE FUNCTION {qualified_name('hxtk_primary_ops_to_hx')}()" ) def create_history_table_triggers( - collection_name: str, foreign_keys: list, prefix: str = "t100_" + collection_name: str, foreign_tables: list, prefix: str = "t100_" ): # Trigger: Add foreign key values to each record inserted into history table. - fk_args = ( - f"'{sql_array(sql_array(pair) for pair in foreign_keys)}'" - if foreign_keys + ft_args = ( + f"'{sql_array(sql_array(pair) for pair in foreign_tables)}'" + if foreign_tables else "" ) op.execute( @@ -199,18 +221,18 @@ def create_history_table_triggers( f" BEFORE INSERT " f" ON {hx_table_name(collection_name)} " f" FOR EACH ROW " - f" EXECUTE FUNCTION {qualified_name('hxtk_add_foreign_hx_keys')}({fk_args})" + f" EXECUTE FUNCTION {qualified_name('hxtk_add_foreign_hx_keys')}({ft_args})" ) def drop_history_triggers(collection_name: str, prefix: str = "t100_"): op.execute( f"DROP TRIGGER {prefix}primary_control_hx_cols " - f"ON {pri_table_name(collection_name)}" + f"ON {main_table_name(collection_name)}" ) op.execute( f"DROP TRIGGER {prefix}primary_ops_to_hx " - f"ON {pri_table_name(collection_name)}" + f"ON {main_table_name(collection_name)}" ) op.execute( f"DROP TRIGGER {prefix}add_foreign_hx_keys " diff --git a/pycds/alembic/versions/8c05da87cb79_add_hx_tkg_to_obs_raw.py b/pycds/alembic/versions/8c05da87cb79_add_hx_tkg_to_obs_raw.py index 94290c8..090e25b 100644 --- a/pycds/alembic/versions/8c05da87cb79_add_hx_tkg_to_obs_raw.py +++ b/pycds/alembic/versions/8c05da87cb79_add_hx_tkg_to_obs_raw.py @@ -20,8 +20,7 @@ create_primary_table_triggers, create_history_table_indexes, hx_table_name, - update_obs_raw_history_FKs, - pri_table_name, + main_table_name, ) from pycds.alembic.util import grant_standard_table_privileges @@ -37,7 +36,7 @@ table_name = "obs_raw" primary_key_name = "obs_raw_id" -foreign_keys = [("meta_history", "history_id"), ("meta_vars", "vars_id")] +foreign_tables = [("meta_history", "history_id"), ("meta_vars", "vars_id")] def upgrade(): @@ -59,25 +58,23 @@ def upgrade(): ) # Existing trigger on obs_raw is superseded by the hx tracking trigger. op.execute( - f"DROP TRIGGER IF EXISTS update_mod_time ON {pri_table_name(table_name)}" + f"DROP TRIGGER IF EXISTS update_mod_time ON {main_table_name(table_name)}" ) create_primary_table_triggers(table_name) # History table #### - create_history_table(table_name, foreign_keys) - - # Populate the history table, then update its history FKs in bulk. - # If we let the FK trigger do this work, fired row-by-row on ~1e9 records, - # it requires an unfeasible amount of time, so we do it in bulk. - populate_history_table(table_name, primary_key_name) - update_obs_raw_history_FKs() - + create_history_table(table_name, foreign_tables) + # Populate the history table. History FKs are included in the initial table + # population. It must be done this way: Doing it after population, in bulk, causes + # memory overflows (UPDATEs use a lot of memory). Doing it piecemeal, via the + # triggers, on 1e9 records is completely time infeasible. + populate_history_table(table_name, primary_key_name, foreign_tables) # History table triggers must be created after the table is populated. - create_history_table_triggers(table_name, foreign_keys) - - create_history_table_indexes(table_name, primary_key_name, foreign_keys) + create_history_table_triggers(table_name, foreign_tables) + # Indexes are better created after table is populated than before. + create_history_table_indexes(table_name, primary_key_name, foreign_tables) grant_standard_table_privileges(table_name, schema=schema_name) @@ -88,7 +85,7 @@ def downgrade(): op.execute( f"CREATE TRIGGER update_mod_time" f" BEFORE UPDATE" - f" ON {pri_table_name(table_name)}" + f" ON {main_table_name(table_name)}" f" FOR EACH ROW" f" EXECUTE FUNCTION public.moddatetime('mod_time')" ) diff --git a/pycds/alembic/versions/a59d64cf16ca_add_hx_tkg_to_main_metadata_tables.py b/pycds/alembic/versions/a59d64cf16ca_add_hx_tkg_to_main_metadata_tables.py index 2b15da7..b691aac 100644 --- a/pycds/alembic/versions/a59d64cf16ca_add_hx_tkg_to_main_metadata_tables.py +++ b/pycds/alembic/versions/a59d64cf16ca_add_hx_tkg_to_main_metadata_tables.py @@ -45,19 +45,19 @@ def upgrade(): # the history table is populated can find the functions that they call. op.get_bind().execute(f"SET search_path TO {schema_name}, public") - for table_name, primary_key_name, foreign_keys, extra_indexes in table_info: + for table_name, primary_key_name, foreign_tables, extra_indexes in table_info: # Primary table add_history_cols_to_primary(table_name) create_primary_table_triggers(table_name) # History table - create_history_table(table_name, foreign_keys) + create_history_table(table_name, foreign_tables) + populate_history_table(table_name, primary_key_name, foreign_tables) + # History table triggers must be created after the table is populated. + create_history_table_triggers(table_name, foreign_tables) create_history_table_indexes( - table_name, primary_key_name, foreign_keys, extra_indexes + table_name, primary_key_name, foreign_tables, extra_indexes ) - # History table triggers must be created before the table is populated. - create_history_table_triggers(table_name, foreign_keys) - populate_history_table(table_name, primary_key_name) grant_standard_table_privileges(table_name, schema=schema_name) diff --git a/tests/alembic_migrations/helpers.py b/tests/alembic_migrations/helpers.py index e236267..0c8944d 100644 --- a/tests/alembic_migrations/helpers.py +++ b/tests/alembic_migrations/helpers.py @@ -18,7 +18,7 @@ from sqlalchemy.types import TIMESTAMP, VARCHAR, BOOLEAN, INTEGER from pycds.alembic.change_history_utils import hx_id_name -from pycds.alembic.change_history_utils import pri_table_name, hx_table_name +from pycds.alembic.change_history_utils import main_table_name, hx_table_name from pycds.database import get_schema_item_names @@ -57,7 +57,7 @@ def check_history_tracking_upgrade( engine, table_name: str, pri_key_name: str, - foreign_keys: list[tuple[str, str]], + foreign_tables: list[tuple[str, str]], schema_name: str, pri_columns_added: tuple[tuple[str, sqlalchemy.types]] = ( ("mod_time", TIMESTAMP), @@ -70,24 +70,24 @@ def check_history_tracking_upgrade( names = set(get_schema_item_names(engine, "tables", schema_name=schema_name)) metadata = MetaData(schema=schema_name, bind=engine) - # Primary table: columns added - pri_name = pri_table_name(table_name, schema=None) - assert pri_name in names - pri_table = Table(pri_name, metadata, autoload_with=engine) + # Main table: columns added + main_name = main_table_name(table_name, schema=None) + assert main_name in names + main_table = Table(main_name, metadata, autoload_with=engine) for col_name, col_type in pri_columns_added: - check_column(pri_table, col_name, col_type) + check_column(main_table, col_name, col_type) # History table columns: primary plus additional columns hx_name = hx_table_name(table_name, schema=None) assert hx_name in names hx_table = Table(hx_name, metadata, autoload_with=engine) - for col in pri_table.columns: + for col in main_table.columns: check_column(hx_table, col.name, col.type.__class__) check_column(hx_table, "deleted", BOOLEAN) check_column(hx_table, hx_id_name(table_name), INTEGER) - for fk_table_name, fk_key_name in foreign_keys: - check_column(hx_table, fk_key_name, INTEGER) - check_column(hx_table, hx_id_name(fk_table_name), INTEGER) + for ft_table_name, ft_key_name in foreign_tables: + check_column(hx_table, ft_key_name, INTEGER) + check_column(hx_table, hx_id_name(ft_table_name), INTEGER) # History table indexes. This test does not check index type, but it # does check what columns are in each index. @@ -95,13 +95,15 @@ def check_history_tracking_upgrade( (pri_key_name,), ("mod_time",), ("mod_user",), - } | { - (hx_id_name(fk_table_name),) for fk_table_name, _ in (foreign_keys or tuple()) - } == {tuple(c.name for c in index.columns) for index in hx_table.indexes} + } | {(ft_pk_name,) for _, ft_pk_name in (foreign_tables or tuple())} | { + (hx_id_name(ft_table_name),) for ft_table_name, _ in (foreign_tables or tuple()) + } == { + tuple(c.name for c in index.columns) for index in hx_table.indexes + } # Triggers check_triggers( - pri_table, + main_table, [ ( f"{tg_prefix}primary_control_hx_cols", @@ -142,9 +144,9 @@ def check_history_tracking_downgrade( metadata = MetaData(schema=schema_name, bind=engine) # Primary table: columns dropped - pri_name = pri_table_name(table_name, schema=None) - assert pri_name in names - pri_table = Table(pri_name, metadata, autoload_with=engine) + main_name = main_table_name(table_name, schema=None) + assert main_name in names + pri_table = Table(main_name, metadata, autoload_with=engine) for column in pri_columns_dropped: check_column(pri_table, column, present=False) diff --git a/tests/alembic_migrations/versions/v_8c05da87cb79_add_hx_tkg_to_obs_raw/test_smoke.py b/tests/alembic_migrations/versions/v_8c05da87cb79_add_hx_tkg_to_obs_raw/test_smoke.py index 8f2f65e..518007b 100644 --- a/tests/alembic_migrations/versions/v_8c05da87cb79_add_hx_tkg_to_obs_raw/test_smoke.py +++ b/tests/alembic_migrations/versions/v_8c05da87cb79_add_hx_tkg_to_obs_raw/test_smoke.py @@ -7,7 +7,11 @@ from sqlalchemy import Table, MetaData, text from sqlalchemy.types import TIMESTAMP, VARCHAR, BOOLEAN, INTEGER -from pycds.alembic.change_history_utils import pri_table_name, hx_table_name, hx_id_name +from pycds.alembic.change_history_utils import ( + main_table_name, + hx_table_name, + hx_id_name, +) from pycds.database import get_schema_item_names from tests.alembic_migrations.helpers import ( check_history_tracking_upgrade, @@ -19,7 +23,7 @@ table_name = "obs_raw" primary_key_name = "obs_raw_id" -foreign_keys = [("meta_history", "history_id"), ("meta_vars", "vars_id")] +foreign_tables = [("meta_history", "history_id"), ("meta_vars", "vars_id")] @pytest.mark.usefixtures("new_db_left") @@ -36,7 +40,7 @@ def test_upgrade( engine, table_name, primary_key_name, - foreign_keys, + foreign_tables, schema_name, pri_columns_added=(("mod_user", VARCHAR),), ) diff --git a/tests/alembic_migrations/versions/v_a59d64cf16ca_add_hx_tkg_to_main_metadata_tables/test_smoke.py b/tests/alembic_migrations/versions/v_a59d64cf16ca_add_hx_tkg_to_main_metadata_tables/test_smoke.py index 9430a5c..d9139dc 100644 --- a/tests/alembic_migrations/versions/v_a59d64cf16ca_add_hx_tkg_to_main_metadata_tables/test_smoke.py +++ b/tests/alembic_migrations/versions/v_a59d64cf16ca_add_hx_tkg_to_main_metadata_tables/test_smoke.py @@ -7,7 +7,11 @@ from sqlalchemy import Table, MetaData, text from sqlalchemy.types import TIMESTAMP, VARCHAR, BOOLEAN, INTEGER -from pycds.alembic.change_history_utils import pri_table_name, hx_table_name, hx_id_name +from pycds.alembic.change_history_utils import ( + main_table_name, + hx_table_name, + hx_id_name, +) from pycds.database import get_schema_item_names from tests.alembic_migrations.helpers import ( check_history_tracking_upgrade, @@ -35,9 +39,9 @@ def test_upgrade( engine, script = prepared_schema_from_migrations_left # Check that tables have been altered or created as expected. - for table_name, pri_key_name, foreign_keys in table_info: + for table_name, pri_key_name, foreign_tables in table_info: check_history_tracking_upgrade( - engine, table_name, pri_key_name, foreign_keys, schema_name + engine, table_name, pri_key_name, foreign_tables, schema_name )