Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add iceberg sync column types support #304

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@
{{ return(run_query(sql)) }}
{% endif %}
{% endmacro %}

{% macro alter_relation_rename_column(relation, source_column, target_column, target_column_type) -%}
{% set sql -%}
alter {{ relation.type }} {{ relation.render_pure() }}
change column {{ source_column }} {{ target_column }} {{ ddl_data_type(target_column_type) }}
{%- endset -%}
{{ return(run_query(sql)) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,32 @@
{%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%}
{%- set new_target_types = schema_changes_dict['new_target_types'] -%}
{% if table_type == 'iceberg' %}
{#
If last run of alter_column_type was failed on rename tmp column to origin.
Do rename to protect origin column from deletion and losing data.
#}
{% for remove_col in remove_from_target_arr if remove_col.column.endswith('__dbt_alter') %}
{%- set origin_col_name = remove_col.column | replace('__dbt_alter', '') -%}
{% for add_col in add_to_target_arr if add_col.column == origin_col_name %}
{%- do alter_relation_rename_column(target_relation, remove_col.name, add_col.name, add_col.data_type) -%}
{%- do remove_from_target_arr.remove(remove_col) -%}
{%- do add_to_target_arr.remove(add_col) -%}
{% endfor %}
{% endfor %}

{% if add_to_target_arr | length > 0 %}
{%- do alter_relation_add_columns(target_relation, add_to_target_arr) -%}
{% endif %}
{% if remove_from_target_arr | length > 0 %}
{%- do alter_relation_drop_columns(target_relation, remove_from_target_arr) -%}
{% endif %}
{% if new_target_types != [] %}
{% for ntt in new_target_types %}
{% set column_name = ntt['column_name'] %}
{% set new_type = ntt['new_type'] %}
{% do alter_column_type(target_relation, column_name, new_type) %}
{% endfor %}
{% endif %}
{% else %}
{%- set replace_with_target_arr = remove_partitions_from_columns(schema_changes_dict['source_columns'], partitioned_by) -%}
{% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 or new_target_types | length > 0 %}
Expand All @@ -35,3 +55,34 @@
{% endset %}
{% do log(schema_change_message) %}
{% endmacro %}

{% macro athena__alter_column_type(relation, column_name, new_column_type) -%}
{#
1. Create a new column (w/ temp name and correct type)
2. Copy data over to it
3. Drop the existing column
4. Rename the new column to existing column
#}
{%- set tmp_column = column_name + '__dbt_alter' -%}
{%- set new_ddl_data_type = ddl_data_type(new_column_type) -%}

{#- do alter_relation_add_columns(relation, [ tmp_column ]) -#}
{%- set add_column_query -%}
alter {{ relation.type }} {{ relation.render_pure() }} add columns({{ tmp_column }} {{ new_ddl_data_type }});
{%- endset -%}
{%- do run_query(add_column_query) -%}

{%- set update_query -%}
update {{ relation.render_pure() }} set {{ tmp_column }} = cast({{ column_name }} as {{ new_column_type }});
{%- endset -%}
{%- do run_query(update_query) -%}

{#- do alter_relation_drop_columns(relation, [ column_name ]) -#}
{%- set drop_column_query -%}
alter {{ relation.type }} {{ relation.render_pure() }} drop column {{ column_name }};
{%- endset -%}
{%- do run_query(drop_column_query) -%}

{%- do alter_relation_rename_column(relation, tmp_column, column_name, new_column_type) -%}

{% endmacro %}
9 changes: 8 additions & 1 deletion dbt/include/athena/macros/utils/ddl_dml_data_type.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{# Athena has different types between DML and DDL #}
{# ref: https://docs.aws.amazon.com/athena/latest/ug/data-types.html #}
{% macro ddl_data_type(col_type) -%}
-- transform varchar
{%- set table_type = config.get('table_type', 'hive') -%}

-- transform varchar
{% set re = modules.re %}
{% set data_type = re.sub('(?:varchar|character varying)(?:\(\d+\))?', 'string', col_type) %}

Expand All @@ -15,6 +17,11 @@
{% set data_type = data_type.replace('integer', 'int') -%}
{%- endif -%}

-- transform timestamp
{%- if table_type == 'iceberg' and 'timestamp' in data_type -%}
{% set data_type = 'timestamp' -%}
{%- endif -%}

{{ return(data_type) }}
{% endmacro %}

Expand Down