Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FKs in populate step of migration #238

Merged
merged 8 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 86 additions & 23 deletions pycds/alembic/change_history_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ def hx_id_name(collection_name: str, **kwargs):
return f"{collection_name}_hx_id"


def hx_id_seq_name(collection_name: str, **kwargs):
"""Return the name of the sequence that generates the primary id of the history table.
This name is the default name generated by Postgres."""
return f"{hx_table_name(collection_name)}_{hx_id_name(collection_name)}_seq"


def sql_array(a: Iterable[Any]) -> str:
return f"{{{', '.join(a)}}}"

Expand All @@ -56,7 +62,7 @@ def drop_history_cols_from_primary(
op.execute(f"ALTER TABLE {pri_table_name(collection_name)} {drop_columns}")


def create_history_table(collection_name: str, foreign_keys: list[tuple[str, str]]):
def create_history_table(collection_name: str, foreign_tables: list[tuple[str, str]]):
# Create the history table. We can't use Alembic create_table here because it doesn't
# support the LIKE syntax we need.
columns = ", ".join(
Expand All @@ -67,9 +73,9 @@ def create_history_table(collection_name: str, foreign_keys: list[tuple[str, str
f" PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY",
)
+ tuple(
f"{hx_id_name(fk_table_name)} int "
f" REFERENCES {hx_table_name(fk_table_name)}({hx_id_name(fk_table_name)})"
for fk_table_name, _ in (foreign_keys or tuple())
f"{hx_id_name(ft_table_name)} int "
f" REFERENCES {hx_table_name(ft_table_name)}({hx_id_name(ft_table_name)})"
for ft_table_name, _ in (foreign_tables or tuple())
)
)
op.execute(f"CREATE TABLE {hx_table_name(collection_name)} ({columns})")
Expand All @@ -83,7 +89,7 @@ def drop_history_table(collection_name: str):
def create_history_table_indexes(
collection_name: str,
pri_id_name: str,
foreign_keys: list[tuple[str, str]],
foreign_tables: list[tuple[str, str]],
extras=None,
):
"""
Expand All @@ -94,10 +100,12 @@ def create_history_table_indexes(
for columns in (
# Index on primary table primary key, mod_time, mod_user
([pri_id_name], ["mod_time"], ["mod_user"])
# Index on all history foreign keys
# Index on all foreign main table primary keys
+ tuple([ft_pk_name] for _, ft_pk_name in (foreign_tables or tuple()))
# Index on all foreign history table primary keys
+ tuple(
[hx_id_name(fk_table_name)]
for fk_table_name, _ in (foreign_keys or tuple())
[hx_id_name(ft_table_name)]
for ft_table_name, _ in (foreign_tables or tuple())
)
+ (extras or tuple())
):
Expand All @@ -111,18 +119,73 @@ def create_history_table_indexes(
)


def populate_history_table(collection_name: str, pri_id_name: str, limit: int = None):
# Populate the history table with data from the primary table, in order of primary id.
# That ordering guarantees that the newly generated history id's will be in the
# same order, which is required for it to be a valid history table.
op.execute(
f"INSERT INTO {hx_table_name(collection_name)} "
f"SELECT * "
f"FROM {pri_table_name(collection_name)} "
f"ORDER BY {pri_table_name(collection_name)}.{pri_id_name} "
f"LIMIT {limit or 'NULL'}"
def populate_history_table(
collection_name: str,
pri_id_name: str,
foreign_tables: list[tuple[str, str]],
limit: int = None,
):
"""
Populate the history table with data from the main table, in order of item id (main
table primary key).
That ordering guarantees that the newly generated history id's will be in the
same order, which is required for it to be a valid history table.
We include the history FKs in the initial population, because to do it any other
way proves infeasible for large tables (obs_raw) in memory and/or time.
"""

# Foreign tables are used in common table expressions (CTEs) that provide the latest
# foreign table history id's. A series of related objects are generated from the
# foreign table definitions: the CTE names, the CTE definitions, and their usages
# within the query that populates the target history table.

foreign_tables = foreign_tables or tuple()

conditional_comma = "," if len(foreign_tables) > 0 else ""

ft_cte_names = tuple(
f"{ft_table_name}_latest" for ft_table_name, ft_pk_name in foreign_tables
)
ft_cte_name_list = ", ".join(ft_cte_names)

ft_cte_defns = tuple(
f"""
{cte_name} AS (
SELECT {ft_pk_name}, max({hx_id_name(ft_table_name)}) val
FROM {hx_table_name(ft_table_name)}
GROUP BY {ft_pk_name}
)
"""
for cte_name, (ft_table_name, ft_pk_name) in zip(ft_cte_names, foreign_tables)
)
ft_cte_list = ", ".join(ft_cte_defns)

ft_cte_value_list = ", ".join(f"{cte_name}.val" for cte_name in ft_cte_names)

ft_where_conditions = tuple(
f"main.{ft_pk_name} = {cte_name}.{ft_pk_name}"
for cte_name, (ft_table_name, ft_pk_name) in zip(ft_cte_names, foreign_tables)
)
ft_where_clause = (
f"WHERE {' AND '.join(ft_where_conditions)}"
if len(ft_where_conditions) > 0
else ""
)

stmt = f"""
{"WITH" if len(foreign_tables) > 0 else ""}
{ft_cte_list}
INSERT INTO {hx_table_name(collection_name)}
SELECT main.*, false,
nextval('{hx_id_seq_name(collection_name)}'::regclass)
{conditional_comma} {ft_cte_value_list}
FROM {pri_table_name(collection_name)} main
{conditional_comma} {ft_cte_name_list}
{ft_where_clause}
ORDER BY main.{pri_id_name}
"""
op.execute(stmt)


def update_obs_raw_history_FKs(suspend_synchronous_commit: bool = False):
"""
Expand Down Expand Up @@ -186,20 +249,20 @@ def create_primary_table_triggers(collection_name: str, prefix: str = "t100_"):


def create_history_table_triggers(
collection_name: str, foreign_keys: list, prefix: str = "t100_"
collection_name: str, foreign_tables: list, prefix: str = "t100_"
):
# Trigger: Add foreign key values to each record inserted into history table.
fk_args = (
f"'{sql_array(sql_array(pair) for pair in foreign_keys)}'"
if foreign_keys
ft_args = (
f"'{sql_array(sql_array(pair) for pair in foreign_tables)}'"
if foreign_tables
else ""
)
op.execute(
f"CREATE TRIGGER {prefix}add_foreign_hx_keys "
f" BEFORE INSERT "
f" ON {hx_table_name(collection_name)} "
f" FOR EACH ROW "
f" EXECUTE FUNCTION {qualified_name('hxtk_add_foreign_hx_keys')}({fk_args})"
f" EXECUTE FUNCTION {qualified_name('hxtk_add_foreign_hx_keys')}({ft_args})"
)


Expand Down
14 changes: 6 additions & 8 deletions pycds/alembic/versions/8c05da87cb79_add_hx_tkg_to_obs_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,14 @@ def upgrade():
####

create_history_table(table_name, foreign_keys)

# Populate the history table, then update its history FKs in bulk.
# If we let the FK trigger do this work, fired row-by-row on ~1e9 records,
# it requires an unfeasible amount of time, so we do it in bulk.
populate_history_table(table_name, primary_key_name)
update_obs_raw_history_FKs()

# Populate the history table. History FKs are included in the initial table
# population. It must be done this way: Doing it after population, in bulk, causes
# memory overflows (UPDATEs use a lot of memory). Doing it piecemeal, via the
# triggers, on 1e9 records is completely time infeasible.
populate_history_table(table_name, primary_key_name, foreign_keys)
# History table triggers must be created after the table is populated.
create_history_table_triggers(table_name, foreign_keys)

# Indexes are better created after table is populated than before.
create_history_table_indexes(table_name, primary_key_name, foreign_keys)
grant_standard_table_privileges(table_name, schema=schema_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,19 @@ def upgrade():
# the history table is populated can find the functions that they call.
op.get_bind().execute(f"SET search_path TO {schema_name}, public")

for table_name, primary_key_name, foreign_keys, extra_indexes in table_info:
for table_name, primary_key_name, foreign_tables, extra_indexes in table_info:
# Primary table
add_history_cols_to_primary(table_name)
create_primary_table_triggers(table_name)

# History table
create_history_table(table_name, foreign_keys)
create_history_table(table_name, foreign_tables)
populate_history_table(table_name, primary_key_name, foreign_tables)
# History table triggers must be created after the table is populated.
create_history_table_triggers(table_name, foreign_tables)
create_history_table_indexes(
table_name, primary_key_name, foreign_keys, extra_indexes
table_name, primary_key_name, foreign_tables, extra_indexes
)
# History table triggers must be created before the table is populated.
create_history_table_triggers(table_name, foreign_keys)
populate_history_table(table_name, primary_key_name)
grant_standard_table_privileges(table_name, schema=schema_name)


Expand Down
16 changes: 9 additions & 7 deletions tests/alembic_migrations/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def check_history_tracking_upgrade(
engine,
table_name: str,
pri_key_name: str,
foreign_keys: list[tuple[str, str]],
foreign_tables: list[tuple[str, str]],
schema_name: str,
pri_columns_added: tuple[tuple[str, sqlalchemy.types]] = (
("mod_time", TIMESTAMP),
Expand Down Expand Up @@ -85,19 +85,21 @@ def check_history_tracking_upgrade(
check_column(hx_table, col.name, col.type.__class__)
check_column(hx_table, "deleted", BOOLEAN)
check_column(hx_table, hx_id_name(table_name), INTEGER)
for fk_table_name, fk_key_name in foreign_keys:
check_column(hx_table, fk_key_name, INTEGER)
check_column(hx_table, hx_id_name(fk_table_name), INTEGER)
for ft_table_name, ft_key_name in foreign_tables:
check_column(hx_table, ft_key_name, INTEGER)
check_column(hx_table, hx_id_name(ft_table_name), INTEGER)

# History table indexes. This test does not check index type, but it
# does check what columns are in each index.
assert {
(pri_key_name,),
("mod_time",),
("mod_user",),
} | {
(hx_id_name(fk_table_name),) for fk_table_name, _ in (foreign_keys or tuple())
} == {tuple(c.name for c in index.columns) for index in hx_table.indexes}
} | {(ft_pk_name,) for _, ft_pk_name in (foreign_tables or tuple())} | {
(hx_id_name(ft_table_name),) for ft_table_name, _ in (foreign_tables or tuple())
} == {
tuple(c.name for c in index.columns) for index in hx_table.indexes
}

# Triggers
check_triggers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

table_name = "obs_raw"
primary_key_name = "obs_raw_id"
foreign_keys = [("meta_history", "history_id"), ("meta_vars", "vars_id")]
foreign_tables = [("meta_history", "history_id"), ("meta_vars", "vars_id")]


@pytest.mark.usefixtures("new_db_left")
Expand All @@ -36,7 +36,7 @@ def test_upgrade(
engine,
table_name,
primary_key_name,
foreign_keys,
foreign_tables,
schema_name,
pri_columns_added=(("mod_user", VARCHAR),),
)
Expand Down