Skip to content

Commit

Permalink
fix(migrations): add 0027_2_10_3_fix_dag_schedule_dataset_alias_refer…
Browse files Browse the repository at this point in the history
…ence_naming to fix constraint naming (#43373)
  • Loading branch information
Lee-W authored Oct 28, 2024
1 parent e9192f5 commit ffe7751
Show file tree
Hide file tree
Showing 20 changed files with 144 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def upgrade():
sa.ForeignKeyConstraint(
("alias_id",),
["dataset_alias.id"],
name="dsdar_dataset_alias_fkey",
name="dsdar_dataset_fkey",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Rename dag_schedule_dataset_alias_reference constraint names.
Revision ID: 5f2621c13b39
Revises: 22ed7efa9da2
Create Date: 2024-10-25 04:03:33.002701
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from alembic import op
from sqlalchemy import inspect

# revision identifiers, used by Alembic.
revision = "5f2621c13b39"
down_revision = "22ed7efa9da2"
branch_labels = None
depends_on = None
airflow_version = "2.10.3"

if TYPE_CHECKING:
from alembic.operations.base import BatchOperations
from sqlalchemy.sql.elements import conv


def _rename_fk_constraint(
*,
batch_op: BatchOperations,
original_name: str | conv,
new_name: str | conv,
referent_table: str,
local_cols: list[str],
remote_cols: list[str],
ondelete: str,
) -> None:
batch_op.drop_constraint(original_name, type_="foreignkey")
batch_op.create_foreign_key(
constraint_name=new_name,
referent_table=referent_table,
local_cols=local_cols,
remote_cols=remote_cols,
ondelete=ondelete,
)


def upgrade():
"""Rename dag_schedule_dataset_alias_reference constraint."""
with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op:
bind = op.get_context().bind
insp = inspect(bind)
fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")]

# "dsdar_dataset_alias_fkey" was the constraint name defined in the model while "dsdar_dataset_fkey" is the one
# defined in the previous migration.
# Rename this constraint name if user is using the name "dsdar_dataset_fkey".
if "dsdar_dataset_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dataset_fkey",
new_name="dsdar_dataset_alias_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)

# "dsdar_dag_fkey" was the constraint name defined in the model while "dsdar_dag_id_fkey" is the one
# defined in the previous migration.
# Rename this constraint name if user is using the name "dsdar_dag_fkey".
if "dsdar_dag_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dag_fkey",
new_name="dsdar_dag_id_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)


def downgrade():
"""Undo dag_schedule_dataset_alias_reference constraint rename."""
with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op:
bind = op.get_context().bind
insp = inspect(bind)
fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")]
if "dsdar_dataset_alias_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dataset_alias_fkey",
new_name="dsdar_dataset_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)

if "dsdar_dag_id_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dag_id_fkey",
new_name="dsdar_dag_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
Drop ab_user.id foreign key.
Revision ID: 044f740568ec
Revises: 22ed7efa9da2
Revises: 5f2621c13b39
Create Date: 2024-08-02 07:18:29.830521
"""
Expand All @@ -31,7 +31,7 @@

# revision identifiers, used by Alembic.
revision = "044f740568ec"
down_revision = "22ed7efa9da2"
down_revision = "5f2621c13b39"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ def upgrade():

with op.batch_alter_table("dag_schedule_asset_alias_reference", schema=None) as batch_op:
batch_op.drop_constraint("dsdar_dataset_alias_fkey", type_="foreignkey")
batch_op.drop_constraint("dsdar_dag_fkey", type_="foreignkey")
if op.get_bind().dialect.name in ("postgresql", "mysql"):
batch_op.drop_constraint("dsdar_dag_id_fkey", type_="foreignkey")

_rename_pk_constraint(
batch_op=batch_op,
Expand All @@ -218,7 +219,7 @@ def upgrade():
ondelete="CASCADE",
)
batch_op.create_foreign_key(
constraint_name="dsaar_dag_fkey",
constraint_name="dsaar_dag_id_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
Expand Down Expand Up @@ -495,7 +496,7 @@ def downgrade():

with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op:
batch_op.drop_constraint("dsaar_asset_alias_fkey", type_="foreignkey")
batch_op.drop_constraint("dsaar_dag_fkey", type_="foreignkey")
batch_op.drop_constraint("dsaar_dag_id_fkey", type_="foreignkey")

_rename_pk_constraint(
batch_op=batch_op,
Expand All @@ -519,7 +520,7 @@ def downgrade():
ondelete="CASCADE",
)
batch_op.create_foreign_key(
constraint_name="dsdar_dag_fkey",
constraint_name="dsdar_dag_id_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class DagScheduleAssetAliasReference(Base):
ForeignKeyConstraint(
columns=(dag_id,),
refcolumns=["dag.dag_id"],
name="dsaar_dag_fkey",
name="dsaar_dag_id_fkey",
ondelete="CASCADE",
),
Index("idx_dag_schedule_asset_alias_reference_dag_id", dag_id),
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class MappedClassProtocol(Protocol):
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
"3.0.0": "05234396c6fc",
}

Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
bda4fb36d2ac0f34e60a9969b6c1c1e6a98b555b0fc6d0e7bfcee9a89fb95fbf
5067b6eac290220d23ce1bef6ce6d02478757f626f5a81a53ca2bba8e31c094c
5 changes: 4 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ Here's the list of all the Database Migrations that are executed via when you ru
| ``d0f1c55954fa`` | ``044f740568ec`` | ``3.0.0`` | Remove SubDAGs: ``is_subdag`` & ``root_dag_id`` columns from |
| | | | DAG table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``044f740568ec`` | ``22ed7efa9da2`` | ``3.0.0`` | Drop ab_user.id foreign key. |
| ``044f740568ec`` | ``5f2621c13b39`` | ``3.0.0`` | Drop ab_user.id foreign key. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``5f2621c13b39`` | ``22ed7efa9da2`` | ``2.10.3`` | Rename dag_schedule_dataset_alias_reference constraint |
| | | | names. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``22ed7efa9da2`` | ``8684e37832e6`` | ``2.10.0`` | Add dag_schedule_dataset_alias_reference table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down

0 comments on commit ffe7751

Please sign in to comment.