Skip to content

Commit

Permalink
fix: Invalid merge when unique_key == merge_update_columns (#406)
Browse files Browse the repository at this point in the history
Co-authored-by: Serhii Dimchenko <[email protected]>
  • Loading branch information
artem-garmash and svdimchenko authored Sep 14, 2023
1 parent 9dec391 commit fc00e80
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,17 @@
when matched and ({{ delete_condition }})
then delete
{%- endif %}
when matched
then update set
{%- for col in update_columns %}
{%- if merge_update_columns_rules and col.name in merge_update_columns_rules %}
{{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }}
{%- else -%}
{{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }}
{%- endif -%}
{%- endfor %}
{% if update_columns -%}
when matched
then update set
{%- for col in update_columns %}
{%- if merge_update_columns_rules and col.name in merge_update_columns_rules %}
{{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }}
{%- else -%}
{{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }}
{%- endif -%}
{%- endfor %}
{%- endif %}
when not matched
then insert ({{ dest_cols_csv }})
values ({{ src_cols_csv }})
Expand Down
69 changes: 69 additions & 0 deletions tests/functional/adapter/test_incremental_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import pytest

from dbt.tests.adapter.incremental.test_incremental_merge_exclude_columns import (
BaseMergeExcludeColumns,
)
from dbt.tests.adapter.incremental.test_incremental_predicates import (
BaseIncrementalPredicates,
)
Expand Down Expand Up @@ -46,6 +49,42 @@
3,anyway,purple
"""

models__merge_exclude_all_columns_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
merge_exclude_columns=['msg', 'color']
) }}

{% if not is_incremental() %}

-- data for first invocation of model

select 1 as id, 'hello' as msg, 'blue' as color
union all
select 2 as id, 'goodbye' as msg, 'red' as color

{% else %}

-- data for subsequent incremental update

select 1 as id, 'hey' as msg, 'blue' as color
union all
select 2 as id, 'yo' as msg, 'green' as color
union all
select 3 as id, 'anyway' as msg, 'purple' as color

{% endif %}
"""


seeds__expected_merge_exclude_all_columns_csv = """id,msg,color
1,hello,blue
2,goodbye,red
3,anyway,purple
"""


class TestIcebergIncrementalUniqueKey(BaseIncrementalUniqueKey):
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -185,6 +224,36 @@ def test__incremental_predicates(self, project):
self.check_scenario_correctness(expected_fields, test_case_fields, project)


class TestIcebergMergeExcludeColumns(BaseMergeExcludeColumns):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_strategy": "merge",
"+table_type": "iceberg",
}
}


class TestIcebergMergeExcludeAllColumns(BaseMergeExcludeColumns):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_strategy": "merge",
"+table_type": "iceberg",
}
}

@pytest.fixture(scope="class")
def models(self):
return {"merge_exclude_columns.sql": models__merge_exclude_all_columns_sql}

@pytest.fixture(scope="class")
def seeds(self):
return {"expected_merge_exclude_columns.csv": seeds__expected_merge_exclude_all_columns_csv}


def replace_cast_date(model: str) -> str:
"""Wrap all date strings with a cast date function"""

Expand Down
115 changes: 115 additions & 0 deletions tests/functional/adapter/test_incremental_iceberg_merge_no_updates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from collections import namedtuple

import pytest

from dbt.tests.util import check_relations_equal, run_dbt

models__merge_no_updates_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy = 'merge',
merge_update_columns = ['id'],
table_type = 'iceberg',
) }}

{% if not is_incremental() %}

-- data for first invocation of model

select 1 as id, 'hello' as msg, 'blue' as color
union all
select 2 as id, 'goodbye' as msg, 'red' as color

{% else %}

-- data for subsequent incremental update

select 1 as id, 'hey' as msg, 'blue' as color
union all
select 2 as id, 'yo' as msg, 'green' as color
union all
select 3 as id, 'anyway' as msg, 'purple' as color

{% endif %}
"""

seeds__expected_merge_no_updates_csv = """id,msg,color
1,hello,blue
2,goodbye,red
3,anyway,purple
"""

ResultHolder = namedtuple(
"ResultHolder",
[
"seed_count",
"model_count",
"seed_rows",
"inc_test_model_count",
"relation",
],
)


class TestIncrementalIcebergMergeNoUpdates:
@pytest.fixture(scope="class")
def models(self):
return {"merge_no_updates.sql": models__merge_no_updates_sql}

@pytest.fixture(scope="class")
def seeds(self):
return {"expected_merge_no_updates.csv": seeds__expected_merge_no_updates_csv}

def update_incremental_model(self, incremental_model):
"""update incremental model after the seed table has been updated"""
model_result_set = run_dbt(["run", "--select", incremental_model])
return len(model_result_set)

def get_test_fields(self, project, seed, incremental_model, update_sql_file):
seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"]))

model_count = len(run_dbt(["run", "--select", incremental_model, "--full-refresh"]))

relation = incremental_model
# update seed in anticipation of incremental model update
row_count_query = f"select * from {project.test_schema}.{seed}"

seed_rows = len(project.run_sql(row_count_query, fetch="all"))

# propagate seed state to incremental model according to unique keys
inc_test_model_count = self.update_incremental_model(incremental_model=incremental_model)

return ResultHolder(seed_count, model_count, seed_rows, inc_test_model_count, relation)

def check_scenario_correctness(self, expected_fields, test_case_fields, project):
"""Invoke assertions to verify correct build functionality"""
# 1. test seed(s) should build afresh
assert expected_fields.seed_count == test_case_fields.seed_count
# 2. test model(s) should build afresh
assert expected_fields.model_count == test_case_fields.model_count
# 3. seeds should have intended row counts post update
assert expected_fields.seed_rows == test_case_fields.seed_rows
# 4. incremental test model(s) should be updated
assert expected_fields.inc_test_model_count == test_case_fields.inc_test_model_count
# 5. result table should match intended result set (itself a relation)
check_relations_equal(project.adapter, [expected_fields.relation, test_case_fields.relation])

def test__merge_no_updates(self, project):
"""seed should match model after incremental run"""

expected_fields = ResultHolder(
seed_count=1,
model_count=1,
inc_test_model_count=1,
seed_rows=3,
relation="expected_merge_no_updates",
)

test_case_fields = self.get_test_fields(
project,
seed="expected_merge_no_updates",
incremental_model="merge_no_updates",
update_sql_file=None,
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)

0 comments on commit fc00e80

Please sign in to comment.