Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
svdimchenko authored Sep 14, 2023
2 parents 775f85e + 2b65fb0 commit 1adbc5d
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 10 deletions.
3 changes: 2 additions & 1 deletion dbt/include/athena/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
{%- if col['data_type'] is not defined -%}
{{ col_err.append(col['name']) }}
{%- else -%}
cast(null as {{ dml_data_type(col['data_type']) }}) as {{ col['name'] }}{{ ", " if not loop.last }}
{% set col_name = adapter.quote(col['name']) if col.get('quote') else col['name'] %}
cast(null as {{ dml_data_type(col['data_type']) }}) as {{ col_name }}{{ ", " if not loop.last }}
{%- endif -%}
{%- endfor -%}
{%- if (col_err | length) > 0 -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,17 @@
when matched and ({{ delete_condition }})
then delete
{%- endif %}
when matched
then update set
{%- for col in update_columns %}
{%- if merge_update_columns_rules and col.name in merge_update_columns_rules %}
{{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }}
{%- else -%}
{{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }}
{%- endif -%}
{%- endfor %}
{% if update_columns -%}
when matched
then update set
{%- for col in update_columns %}
{%- if merge_update_columns_rules and col.name in merge_update_columns_rules %}
{{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }}
{%- else -%}
{{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }}
{%- endif -%}
{%- endfor %}
{%- endif %}
when not matched
then insert ({{ dest_cols_csv }})
values ({{ src_cols_csv }})
Expand Down
26 changes: 26 additions & 0 deletions tests/functional/adapter/test_constraints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest

from dbt.tests.adapter.constraints.fixtures import (
model_quoted_column_schema_yml,
my_model_with_quoted_column_name_sql,
)
from dbt.tests.adapter.constraints.test_constraints import BaseConstraintQuotedColumn


class TestAthenaConstraintQuotedColumn(BaseConstraintQuotedColumn):
@pytest.fixture(scope="class")
def models(self):
return {
"my_model.sql": my_model_with_quoted_column_name_sql,
"constraints_schema.yml": model_quoted_column_schema_yml.replace("text", "string"),
}

@pytest.fixture(scope="class")
def expected_sql(self):
# FIXME: dbt-athena outputs a query about stats into `target/run/` directory.
# dbt-core expects the query to be a ddl statement to create a table.
# This is a workaround to pass the test for now.

# NOTE: by the above reason, this test just checks the query can be executed without errors.
# The query itself is not checked.
return 'SELECT \'{"rowcount":1,"data_scanned_in_bytes":0}\';'
69 changes: 69 additions & 0 deletions tests/functional/adapter/test_incremental_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import pytest

from dbt.tests.adapter.incremental.test_incremental_merge_exclude_columns import (
BaseMergeExcludeColumns,
)
from dbt.tests.adapter.incremental.test_incremental_predicates import (
BaseIncrementalPredicates,
)
Expand Down Expand Up @@ -46,6 +49,42 @@
3,anyway,purple
"""

models__merge_exclude_all_columns_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
merge_exclude_columns=['msg', 'color']
) }}
{% if not is_incremental() %}
-- data for first invocation of model
select 1 as id, 'hello' as msg, 'blue' as color
union all
select 2 as id, 'goodbye' as msg, 'red' as color
{% else %}
-- data for subsequent incremental update
select 1 as id, 'hey' as msg, 'blue' as color
union all
select 2 as id, 'yo' as msg, 'green' as color
union all
select 3 as id, 'anyway' as msg, 'purple' as color
{% endif %}
"""


seeds__expected_merge_exclude_all_columns_csv = """id,msg,color
1,hello,blue
2,goodbye,red
3,anyway,purple
"""


class TestIcebergIncrementalUniqueKey(BaseIncrementalUniqueKey):
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -185,6 +224,36 @@ def test__incremental_predicates(self, project):
self.check_scenario_correctness(expected_fields, test_case_fields, project)


class TestIcebergMergeExcludeColumns(BaseMergeExcludeColumns):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_strategy": "merge",
"+table_type": "iceberg",
}
}


class TestIcebergMergeExcludeAllColumns(BaseMergeExcludeColumns):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_strategy": "merge",
"+table_type": "iceberg",
}
}

@pytest.fixture(scope="class")
def models(self):
return {"merge_exclude_columns.sql": models__merge_exclude_all_columns_sql}

@pytest.fixture(scope="class")
def seeds(self):
return {"expected_merge_exclude_columns.csv": seeds__expected_merge_exclude_all_columns_csv}


def replace_cast_date(model: str) -> str:
"""Wrap all date strings with a cast date function"""

Expand Down
115 changes: 115 additions & 0 deletions tests/functional/adapter/test_incremental_iceberg_merge_no_updates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from collections import namedtuple

import pytest

from dbt.tests.util import check_relations_equal, run_dbt

models__merge_no_updates_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy = 'merge',
merge_update_columns = ['id'],
table_type = 'iceberg',
) }}
{% if not is_incremental() %}
-- data for first invocation of model
select 1 as id, 'hello' as msg, 'blue' as color
union all
select 2 as id, 'goodbye' as msg, 'red' as color
{% else %}
-- data for subsequent incremental update
select 1 as id, 'hey' as msg, 'blue' as color
union all
select 2 as id, 'yo' as msg, 'green' as color
union all
select 3 as id, 'anyway' as msg, 'purple' as color
{% endif %}
"""

seeds__expected_merge_no_updates_csv = """id,msg,color
1,hello,blue
2,goodbye,red
3,anyway,purple
"""

ResultHolder = namedtuple(
"ResultHolder",
[
"seed_count",
"model_count",
"seed_rows",
"inc_test_model_count",
"relation",
],
)


class TestIncrementalIcebergMergeNoUpdates:
@pytest.fixture(scope="class")
def models(self):
return {"merge_no_updates.sql": models__merge_no_updates_sql}

@pytest.fixture(scope="class")
def seeds(self):
return {"expected_merge_no_updates.csv": seeds__expected_merge_no_updates_csv}

def update_incremental_model(self, incremental_model):
"""update incremental model after the seed table has been updated"""
model_result_set = run_dbt(["run", "--select", incremental_model])
return len(model_result_set)

def get_test_fields(self, project, seed, incremental_model, update_sql_file):
seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"]))

model_count = len(run_dbt(["run", "--select", incremental_model, "--full-refresh"]))

relation = incremental_model
# update seed in anticipation of incremental model update
row_count_query = f"select * from {project.test_schema}.{seed}"

seed_rows = len(project.run_sql(row_count_query, fetch="all"))

# propagate seed state to incremental model according to unique keys
inc_test_model_count = self.update_incremental_model(incremental_model=incremental_model)

return ResultHolder(seed_count, model_count, seed_rows, inc_test_model_count, relation)

def check_scenario_correctness(self, expected_fields, test_case_fields, project):
"""Invoke assertions to verify correct build functionality"""
# 1. test seed(s) should build afresh
assert expected_fields.seed_count == test_case_fields.seed_count
# 2. test model(s) should build afresh
assert expected_fields.model_count == test_case_fields.model_count
# 3. seeds should have intended row counts post update
assert expected_fields.seed_rows == test_case_fields.seed_rows
# 4. incremental test model(s) should be updated
assert expected_fields.inc_test_model_count == test_case_fields.inc_test_model_count
# 5. result table should match intended result set (itself a relation)
check_relations_equal(project.adapter, [expected_fields.relation, test_case_fields.relation])

def test__merge_no_updates(self, project):
"""seed should match model after incremental run"""

expected_fields = ResultHolder(
seed_count=1,
model_count=1,
inc_test_model_count=1,
seed_rows=3,
relation="expected_merge_no_updates",
)

test_case_fields = self.get_test_fields(
project,
seed="expected_merge_no_updates",
incremental_model="merge_no_updates",
update_sql_file=None,
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)

0 comments on commit 1adbc5d

Please sign in to comment.