Skip to content

Commit

Permalink
feat: add iceberg sync column types support (#304)
Browse files Browse the repository at this point in the history
Co-authored-by: Oleksandr Kot <[email protected]>
Co-authored-by: Serhii Dimchenko <[email protected]>
Co-authored-by: Jérémy Guiselin <[email protected]>
  • Loading branch information
4 people authored Aug 4, 2023
1 parent 8d2a80c commit 3c1c6c1
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@
{{ return(run_query(sql)) }}
{% endif %}
{% endmacro %}

{% macro alter_relation_rename_column(relation, source_column, target_column, target_column_type) -%}
{% set sql -%}
alter {{ relation.type }} {{ relation.render_pure() }}
change column {{ source_column }} {{ target_column }} {{ ddl_data_type(target_column_type) }}
{%- endset -%}
{{ return(run_query(sql)) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,32 @@
{%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%}
{%- set new_target_types = schema_changes_dict['new_target_types'] -%}
{% if table_type == 'iceberg' %}
{#
If last run of alter_column_type was failed on rename tmp column to origin.
Do rename to protect origin column from deletion and losing data.
#}
{% for remove_col in remove_from_target_arr if remove_col.column.endswith('__dbt_alter') %}
{%- set origin_col_name = remove_col.column | replace('__dbt_alter', '') -%}
{% for add_col in add_to_target_arr if add_col.column == origin_col_name %}
{%- do alter_relation_rename_column(target_relation, remove_col.name, add_col.name, add_col.data_type) -%}
{%- do remove_from_target_arr.remove(remove_col) -%}
{%- do add_to_target_arr.remove(add_col) -%}
{% endfor %}
{% endfor %}

{% if add_to_target_arr | length > 0 %}
{%- do alter_relation_add_columns(target_relation, add_to_target_arr) -%}
{% endif %}
{% if remove_from_target_arr | length > 0 %}
{%- do alter_relation_drop_columns(target_relation, remove_from_target_arr) -%}
{% endif %}
{% if new_target_types != [] %}
{% for ntt in new_target_types %}
{% set column_name = ntt['column_name'] %}
{% set new_type = ntt['new_type'] %}
{% do alter_column_type(target_relation, column_name, new_type) %}
{% endfor %}
{% endif %}
{% else %}
{%- set replace_with_target_arr = remove_partitions_from_columns(schema_changes_dict['source_columns'], partitioned_by) -%}
{% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 or new_target_types | length > 0 %}
Expand All @@ -35,3 +55,34 @@
{% endset %}
{% do log(schema_change_message) %}
{% endmacro %}

{% macro athena__alter_column_type(relation, column_name, new_column_type) -%}
{#
1. Create a new column (w/ temp name and correct type)
2. Copy data over to it
3. Drop the existing column
4. Rename the new column to existing column
#}
{%- set tmp_column = column_name + '__dbt_alter' -%}
{%- set new_ddl_data_type = ddl_data_type(new_column_type) -%}

{#- do alter_relation_add_columns(relation, [ tmp_column ]) -#}
{%- set add_column_query -%}
alter {{ relation.type }} {{ relation.render_pure() }} add columns({{ tmp_column }} {{ new_ddl_data_type }});
{%- endset -%}
{%- do run_query(add_column_query) -%}

{%- set update_query -%}
update {{ relation.render_pure() }} set {{ tmp_column }} = cast({{ column_name }} as {{ new_column_type }});
{%- endset -%}
{%- do run_query(update_query) -%}

{#- do alter_relation_drop_columns(relation, [ column_name ]) -#}
{%- set drop_column_query -%}
alter {{ relation.type }} {{ relation.render_pure() }} drop column {{ column_name }};
{%- endset -%}
{%- do run_query(drop_column_query) -%}

{%- do alter_relation_rename_column(relation, tmp_column, column_name, new_column_type) -%}

{% endmacro %}
9 changes: 8 additions & 1 deletion dbt/include/athena/macros/utils/ddl_dml_data_type.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{# Athena has different types between DML and DDL #}
{# ref: https://docs.aws.amazon.com/athena/latest/ug/data-types.html #}
{% macro ddl_data_type(col_type) -%}
-- transform varchar
{%- set table_type = config.get('table_type', 'hive') -%}

-- transform varchar
{% set re = modules.re %}
{% set data_type = re.sub('(?:varchar|character varying)(?:\(\d+\))?', 'string', col_type) %}

Expand All @@ -15,6 +17,11 @@
{% set data_type = data_type.replace('integer', 'int') -%}
{%- endif -%}

-- transform timestamp
{%- if table_type == 'iceberg' and 'timestamp' in data_type -%}
{% set data_type = 'timestamp' -%}
{%- endif -%}

{{ return(data_type) }}
{% endmacro %}

Expand Down

0 comments on commit 3c1c6c1

Please sign in to comment.