Skip to content

Commit

Permalink
rename ill-named constraints in dag_schedule_dataset_alias_reference …
Browse files Browse the repository at this point in the history
…table apache#43314
  • Loading branch information
Lee-W authored Oct 26, 2024
1 parent add8e9d commit 9a868e0
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Rename dag_schedule_dataset_alias_reference constraint names.
Revision ID: 5f2621c13b39
Revises: 22ed7efa9da2
Create Date: 2024-10-25 04:03:33.002701
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from alembic import op
from sqlalchemy import inspect

# revision identifiers, used by Alembic.
revision = "5f2621c13b39"
down_revision = "22ed7efa9da2"
branch_labels = None
depends_on = None
airflow_version = "2.10.3"

if TYPE_CHECKING:
from alembic.operations.base import BatchOperations
from sqlalchemy.sql.elements import conv


def _rename_fk_constraint(
*,
batch_op: BatchOperations,
original_name: str | conv,
new_name: str | conv,
referent_table: str,
local_cols: list[str],
remote_cols: list[str],
ondelete: str,
) -> None:
batch_op.drop_constraint(original_name, type_="foreignkey")
batch_op.create_foreign_key(
constraint_name=new_name,
referent_table=referent_table,
local_cols=local_cols,
remote_cols=remote_cols,
ondelete=ondelete,
)


def upgrade():
"""Rename dag_schedule_dataset_alias_reference constraint."""
with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op:
bind = op.get_context().bind
insp = inspect(bind)
fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")]

# "dsdar_dataset_alias_fkey" was the constraint name defined in the model while "dsdar_dataset_fkey" is the one
# defined in the previous migration.
# Rename this constraint name if user is using the name "dsdar_dataset_fkey".
if "dsdar_dataset_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dataset_fkey",
new_name="dsdar_dataset_alias_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)

# "dsdar_dag_fkey" was the constraint name defined in the model while "dsdar_dag_id_fkey" is the one
# defined in the previous migration.
# Rename this constraint name if user is using the name "dsdar_dag_fkey".
if "dsdar_dag_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dag_fkey",
new_name="dsdar_dag_id_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)


def downgrade():
"""Undo dag_schedule_dataset_alias_reference constraint rename."""
with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op:
bind = op.get_context().bind
insp = inspect(bind)
fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")]
if "dsdar_dataset_alias_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dataset_alias_fkey",
new_name="dsdar_dataset_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)

if "dsdar_dag_id_fkey" in fk_constraints:
_rename_fk_constraint(
batch_op=batch_op,
original_name="dsdar_dag_id_fkey",
new_name="dsdar_dag_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
2 changes: 1 addition & 1 deletion airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class DagScheduleDatasetAliasReference(Base):
ForeignKeyConstraint(
columns=(dag_id,),
refcolumns=["dag.dag_id"],
name="dsdar_dag_fkey",
name="dsdar_dag_id_fkey",
ondelete="CASCADE",
),
Index("idx_dag_schedule_dataset_alias_reference_dag_id", dag_id),
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class MappedClassProtocol(Protocol):
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
85015e607fda61f3f0a00e8f3ef327c54ccc7ccfd0185d0d862b3e01556fe60c
3009a8ae271c281d59ada514c7181f1be4f0e2c6ba37148d4d5bf57502edf1d1
Loading

0 comments on commit 9a868e0

Please sign in to comment.