From b6dd6b256293484b8350e36056634d785b09713e Mon Sep 17 00:00:00 2001 From: Oleksandr Kot Date: Mon, 15 May 2023 16:27:33 +0300 Subject: [PATCH 1/7] Add sync column types support for iceberg tables --- .../models/incremental/on_schema_change.sql | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql index 3d9f8137..d1b0f084 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql @@ -19,6 +19,13 @@ {% if remove_from_target_arr | length > 0 %} {%- do alter_relation_drop_columns(target_relation, remove_from_target_arr) -%} {% endif %} + {% if new_target_types != [] %} + {% for ntt in new_target_types %} + {% set column_name = ntt['column_name'] %} + {% set new_type = ntt['new_type'] %} + {% do alter_column_type(target_relation, column_name, new_type) %} + {% endfor %} + {% endif %} {% else %} {%- set replace_with_target_arr = remove_partitions_from_columns(schema_changes_dict['source_columns'], partitioned_by) -%} {% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 or new_target_types | length > 0 %} @@ -35,3 +42,38 @@ {% endset %} {% do log(schema_change_message) %} {% endmacro %} + +{% macro athena__alter_column_type(relation, column_name, new_column_type) -%} + {# + 1. Create a new column (w/ temp name and correct type) + 2. Copy data over to it + 3. Drop the existing column + 4. Rename the new column to existing column + #} + {%- set tmp_column = column_name + "__dbt_alter" -%} + + {# Fix bug in Athena: systax error when using quoted table name #} + {%- set relation = relation.render_pure() -%} + + {%- set add_column_query -%} + alter table {{ relation }} add columns({{ tmp_column }} {{ new_column_type }}); + {%- endset -%} + + {%- set update_query -%} + update {{ relation }} set {{ tmp_column }} = {{ column_name }}; + {%- endset -%} + + {%- set drop_column_query -%} + alter table {{ relation }} drop column {{ column_name }}; + {%- endset -%} + + {%- set rename_column_query -%} + alter table {{ relation }} change column {{ tmp_column }} {{ column_name }} {{ new_column_type }}; + {%- endset -%} + + {%- do run_query(add_column_query) -%} + {%- do run_query(update_query) -%} + {%- do run_query(drop_column_query) -%} + {%- do run_query(rename_column_query) -%} + +{% endmacro %} From da76efaa6a8659e055169d4244f9cfcdf3ae31e8 Mon Sep 17 00:00:00 2001 From: Oleksandr Kot Date: Mon, 15 May 2023 16:55:14 +0300 Subject: [PATCH 2/7] Fix comment --- .../materializations/models/incremental/on_schema_change.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql index d1b0f084..33941614 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql @@ -52,7 +52,7 @@ #} {%- set tmp_column = column_name + "__dbt_alter" -%} - {# Fix bug in Athena: systax error when using quoted table name #} + {# Fix bug in Athena: error when using quoted table name #} {%- set relation = relation.render_pure() -%} {%- set add_column_query -%} From 56b02bb6534e0675ff5905aa7792a45b2c57338b Mon Sep 17 00:00:00 2001 From: Oleksandr Kot Date: Mon, 15 May 2023 20:53:34 +0300 Subject: [PATCH 3/7] Add found Athena bugs fixes --- .../models/incremental/on_schema_change.sql | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql index 33941614..c0df9820 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql @@ -55,12 +55,24 @@ {# Fix bug in Athena: error when using quoted table name #} {%- set relation = relation.render_pure() -%} + {# Fix bug in Athena: iceberg alter statements work requires "string" column type instead of "varchar" #} + {%- set new_iceberg_column_type = new_column_type | replace('varchar', 'string') -%} + + {# Fix bug in Athena: iceberg alter statements work requires "timestamp" column type instead of "timestamp(X)" #} + {# + Note: if it needs to convert varchar column type with timestamp values to date, + you need to convert it to timestamp type or update with truncating timestamp values to date + #} + {%- if 'timestamp' in new_column_type -%} + {%- set new_iceberg_column_type = 'timestamp' -%} + {%- endif -%} + {%- set add_column_query -%} - alter table {{ relation }} add columns({{ tmp_column }} {{ new_column_type }}); + alter table {{ relation }} add columns({{ tmp_column }} {{ new_iceberg_column_type }}); {%- endset -%} {%- set update_query -%} - update {{ relation }} set {{ tmp_column }} = {{ column_name }}; + update {{ relation }} set {{ tmp_column }} = cast({{ column_name }} as {{ new_column_type }}); {%- endset -%} {%- set drop_column_query -%} @@ -68,7 +80,7 @@ {%- endset -%} {%- set rename_column_query -%} - alter table {{ relation }} change column {{ tmp_column }} {{ column_name }} {{ new_column_type }}; + alter table {{ relation }} change column {{ tmp_column }} {{ column_name }} {{ new_iceberg_column_type }}; {%- endset -%} {%- do run_query(add_column_query) -%} From 7d7e748a913da5bf77525e2b79ecb4c3a2d6f241 Mon Sep 17 00:00:00 2001 From: Oleksandr Kot Date: Mon, 15 May 2023 21:00:05 +0300 Subject: [PATCH 4/7] Fix comments --- .../materializations/models/incremental/on_schema_change.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql index c0df9820..b9224ba1 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql @@ -55,10 +55,10 @@ {# Fix bug in Athena: error when using quoted table name #} {%- set relation = relation.render_pure() -%} - {# Fix bug in Athena: iceberg alter statements work requires "string" column type instead of "varchar" #} + {# Fix bug in Athena: iceberg alter statements requires "string" column type instead of "varchar" #} {%- set new_iceberg_column_type = new_column_type | replace('varchar', 'string') -%} - {# Fix bug in Athena: iceberg alter statements work requires "timestamp" column type instead of "timestamp(X)" #} + {# Fix bug in Athena: iceberg alter statements requires "timestamp" column type instead of "timestamp(X)" #} {# Note: if it needs to convert varchar column type with timestamp values to date, you need to convert it to timestamp type or update with truncating timestamp values to date From 54a526e40b7c66b65267086c296109b837825faf Mon Sep 17 00:00:00 2001 From: Oleksandr Kot Date: Wed, 21 Jun 2023 14:24:10 +0300 Subject: [PATCH 5/7] Add protection of origin table when fail on rename. Use ddl_dml_data_type to change type for iceberg --- .../models/incremental/column_helpers.sql | 8 +++ .../models/incremental/on_schema_change.sql | 51 +++++++++---------- .../materializations/snapshots/snapshot.sql | 2 +- .../athena/macros/utils/ddl_dml_data_type.sql | 9 +++- 4 files changed, 41 insertions(+), 29 deletions(-) diff --git a/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql b/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql index 53b9bfc0..e8263295 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql @@ -48,3 +48,11 @@ {{ return(run_query(sql)) }} {% endif %} {% endmacro %} + +{% macro alter_relation_rename_column(relation, source_column, target_column, target_column_type) -%} + {% set sql -%} + alter {{ relation.type }} {{ relation.render_pure() }} + change column {{ source_column }} {{ target_column }} {{ ddl_data_type(target_column_type) }} + {%- endset -%} + {{ return(run_query(sql)) }} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql index b9224ba1..ef641b70 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql @@ -13,6 +13,19 @@ {%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%} {%- set new_target_types = schema_changes_dict['new_target_types'] -%} {% if table_type == 'iceberg' %} + {# + If last run of alter_column_type was failed on rename tmp column to origin. + Do rename to protect origin column from deletion and losing data. + #} + {% for remove_col in remove_from_target_arr if remove_col.column.endswith('__dbt_alter') %} + {%- set origin_col_name = remove_col.column | replace('__dbt_alter', '') -%} + {% for add_col in add_to_target_arr if add_col.column == origin_col_name %} + {%- do alter_relation_rename_column(target_relation, remove_col.name, add_col.name, add_col.data_type) -%} + {%- do remove_from_target_arr.remove(remove_col) -%} + {%- do add_to_target_arr.remove(add_col) -%} + {% endfor %} + {% endfor %} + {% if add_to_target_arr | length > 0 %} {%- do alter_relation_add_columns(target_relation, add_to_target_arr) -%} {% endif %} @@ -50,42 +63,26 @@ 3. Drop the existing column 4. Rename the new column to existing column #} - {%- set tmp_column = column_name + "__dbt_alter" -%} - - {# Fix bug in Athena: error when using quoted table name #} - {%- set relation = relation.render_pure() -%} - - {# Fix bug in Athena: iceberg alter statements requires "string" column type instead of "varchar" #} - {%- set new_iceberg_column_type = new_column_type | replace('varchar', 'string') -%} - - {# Fix bug in Athena: iceberg alter statements requires "timestamp" column type instead of "timestamp(X)" #} - {# - Note: if it needs to convert varchar column type with timestamp values to date, - you need to convert it to timestamp type or update with truncating timestamp values to date - #} - {%- if 'timestamp' in new_column_type -%} - {%- set new_iceberg_column_type = 'timestamp' -%} - {%- endif -%} + {%- set tmp_column = column_name + '__dbt_alter' -%} + {%- set new_ddl_data_type = ddl_data_type(new_column_type) -%} + {#- do alter_relation_add_columns(relation, [ tmp_column ]) -#} {%- set add_column_query -%} - alter table {{ relation }} add columns({{ tmp_column }} {{ new_iceberg_column_type }}); + alter {{ relation.type }} {{ relation.render_pure() }} add columns({{ tmp_column }} {{ new_ddl_data_type }}); {%- endset -%} + {%- do run_query(add_column_query) -%} {%- set update_query -%} - update {{ relation }} set {{ tmp_column }} = cast({{ column_name }} as {{ new_column_type }}); + update {{ relation.render_pure() }} set {{ tmp_column }} = cast({{ column_name }} as {{ new_column_type }}); {%- endset -%} + {%- do run_query(update_query) -%} + {#- do alter_relation_drop_columns(relation, [ column_name ]) -#} {%- set drop_column_query -%} - alter table {{ relation }} drop column {{ column_name }}; - {%- endset -%} - - {%- set rename_column_query -%} - alter table {{ relation }} change column {{ tmp_column }} {{ column_name }} {{ new_iceberg_column_type }}; + alter {{ relation.type }} {{ relation.render_pure() }} drop column {{ column_name }}; {%- endset -%} - - {%- do run_query(add_column_query) -%} - {%- do run_query(update_query) -%} {%- do run_query(drop_column_query) -%} - {%- do run_query(rename_column_query) -%} + + {%- do alter_relation_rename_column(relation, tmp_column, column_name, new_column_type) -%} {% endmacro %} diff --git a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql index 208f3e9d..10d7a8ee 100644 --- a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql @@ -15,7 +15,7 @@ If hive table then Recreate the snapshot table from the new_snapshot_table If iceberg table then Update the standard snapshot merge to include the DBT_INTERNAL_SOURCE prefix in the src_cols_csv #} -{% macro hive_snapshot_merge_sql(target, source, insert_cols, table_type) -%} +{% macro hive_snapshot_merge_sql(target, source, insert_cols) -%} {%- set target_relation = adapter.get_relation(database=target.database, schema=target.schema, identifier=target.identifier) -%} {%- if target_relation is not none -%} {% do adapter.drop_relation(target_relation) %} diff --git a/dbt/include/athena/macros/utils/ddl_dml_data_type.sql b/dbt/include/athena/macros/utils/ddl_dml_data_type.sql index a97c489c..ec726627 100644 --- a/dbt/include/athena/macros/utils/ddl_dml_data_type.sql +++ b/dbt/include/athena/macros/utils/ddl_dml_data_type.sql @@ -1,7 +1,9 @@ {# Athena has different types between DML and DDL #} {# ref: https://docs.aws.amazon.com/athena/latest/ug/data-types.html #} {% macro ddl_data_type(col_type) -%} - -- transform varchar + {%- set table_type = config.get('table_type', 'hive') -%} + + -- transform varchar {% set re = modules.re %} {% set data_type = re.sub('(?:varchar|character varying)(?:\(\d+\))?', 'string', col_type) %} @@ -15,6 +17,11 @@ {% set data_type = data_type.replace('integer', 'int') -%} {%- endif -%} + -- transform timestamp + {%- if table_type == 'iceberg' and 'timestamp' in data_type -%} + {% set data_type = 'timestamp' -%} + {%- endif -%} + {{ return(data_type) }} {% endmacro %} From 4a4f98d28fd0d33b555cd6a09d83a31a16631391 Mon Sep 17 00:00:00 2001 From: Oleksandr Kot Date: Thu, 22 Jun 2023 12:28:46 +0300 Subject: [PATCH 6/7] Remove unused param from hive_snapshot_merge_sql --- .../athena/macros/materializations/snapshots/snapshot.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql index 10d7a8ee..1da07926 100644 --- a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql @@ -15,7 +15,7 @@ If hive table then Recreate the snapshot table from the new_snapshot_table If iceberg table then Update the standard snapshot merge to include the DBT_INTERNAL_SOURCE prefix in the src_cols_csv #} -{% macro hive_snapshot_merge_sql(target, source, insert_cols) -%} +{% macro hive_snapshot_merge_sql(target, source) -%} {%- set target_relation = adapter.get_relation(database=target.database, schema=target.schema, identifier=target.identifier) -%} {%- if target_relation is not none -%} {% do adapter.drop_relation(target_relation) %} From 1c843d0491ce3e201a36414190edeaf5b0e9a8f2 Mon Sep 17 00:00:00 2001 From: Oleksandr Kot Date: Thu, 22 Jun 2023 13:52:50 +0300 Subject: [PATCH 7/7] Rollback changes in hive_snapshot_merge_sql params --- .../athena/macros/materializations/snapshots/snapshot.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql index 1da07926..208f3e9d 100644 --- a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql @@ -15,7 +15,7 @@ If hive table then Recreate the snapshot table from the new_snapshot_table If iceberg table then Update the standard snapshot merge to include the DBT_INTERNAL_SOURCE prefix in the src_cols_csv #} -{% macro hive_snapshot_merge_sql(target, source) -%} +{% macro hive_snapshot_merge_sql(target, source, insert_cols, table_type) -%} {%- set target_relation = adapter.get_relation(database=target.database, schema=target.schema, identifier=target.identifier) -%} {%- if target_relation is not none -%} {% do adapter.drop_relation(target_relation) %}