diff --git a/README.md b/README.md index 035ed10a..ccc82e72 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,6 @@ * Does **not** support the use of `unique_key` * Supports [snapshots][snapshots] * Does not support [Python models][python-models] -* Does not support [persist docs][persist-docs] for views [seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds [incremental]: https://docs.getdbt.com/docs/build/incremental-models diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index e69cf1d7..17e65ad5 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -37,6 +37,11 @@ logger = AdapterLogger("Athena") +@dataclass +class AthenaAdapterResponse(AdapterResponse): + data_scanned_in_bytes: Optional[int] = None + + @dataclass class AthenaCredentials(Credentials): s3_staging_dir: str @@ -127,6 +132,7 @@ def execute( # type: ignore endpoint_url: Optional[str] = None, cache_size: int = 0, cache_expiration_time: int = 0, + catch_partitions_limit: bool = False, **kwargs, ): def inner() -> AthenaCursor: @@ -153,7 +159,12 @@ def inner() -> AthenaCursor: return self retry = tenacity.Retrying( - retry=retry_if_exception(lambda _: True), + # No need to retry if TOO_MANY_OPEN_PARTITIONS occurs. + # Otherwise, Athena throws ICEBERG_FILESYSTEM_ERROR after retry, + # because not all files are removed immediately after first try to create table + retry=retry_if_exception( + lambda e: False if catch_partitions_limit and "TOO_MANY_OPEN_PARTITIONS" in str(e) else True + ), stop=stop_after_attempt(self._retry_config.attempt), wait=wait_exponential( multiplier=self._retry_config.attempt, @@ -198,6 +209,7 @@ def open(cls, connection: Connection) -> Connection: handle = AthenaConnection( s3_staging_dir=creds.s3_staging_dir, endpoint_url=creds.endpoint_url, + catalog_name=creds.database, schema_name=creds.schema, work_group=creds.work_group, cursor_class=AthenaCursor, @@ -223,9 +235,14 @@ def open(cls, connection: Connection) -> Connection: return connection @classmethod - def get_response(cls, cursor: AthenaCursor) -> AdapterResponse: + def get_response(cls, cursor: AthenaCursor) -> AthenaAdapterResponse: code = "OK" if cursor.state == AthenaQueryExecution.STATE_SUCCEEDED else "ERROR" - return AdapterResponse(_message=f"{code} {cursor.rowcount}", rows_affected=cursor.rowcount, code=code) + return AthenaAdapterResponse( + _message=f"{code} {cursor.rowcount}", + rows_affected=cursor.rowcount, + code=code, + data_scanned_in_bytes=cursor.data_scanned_in_bytes, + ) def cancel(self, connection: Connection) -> None: connection.handle.cancel() diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 9ef979ec..4d6dccc2 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -1,6 +1,7 @@ import csv import os import posixpath as path +import re import tempfile from itertools import chain from textwrap import dedent @@ -15,9 +16,11 @@ from mypy_boto3_glue.type_defs import ( ColumnTypeDef, GetTableResponseTypeDef, + TableInputTypeDef, TableTypeDef, TableVersionTypeDef, ) +from pyathena.error import OperationalError from dbt.adapters.athena import AthenaConnectionManager from dbt.adapters.athena.column import AthenaColumn @@ -636,35 +639,66 @@ def persist_docs_to_glue( model: Dict[str, Any], persist_relation_docs: bool = False, persist_column_docs: bool = False, + skip_archive_table_version: bool = False, ) -> None: + """Save model/columns description to Glue Table metadata. + + :param skip_archive_table_version: if True, current table version will not be archived before creating new one. + The purpose is to avoid creating redundant table version if it already was created during the same dbt run + after CREATE OR REPLACE VIEW or ALTER TABLE statements. + Every dbt run should create not more than one table version. + """ conn = self.connections.get_thread_connection() client = conn.handle with boto3_client_lock: glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config()) - table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name).get("Table") - updated_table = { - "Name": table["Name"], - "StorageDescriptor": table["StorageDescriptor"], - "PartitionKeys": table.get("PartitionKeys", []), - "TableType": table["TableType"], - "Parameters": table.get("Parameters", {}), - "Description": table.get("Description", ""), - } + # By default, there is no need to update Glue Table + need_udpate_table = False + # Get Table from Glue + table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name)["Table"] + # Prepare new version of Glue Table picking up significant fields + updated_table = self._get_table_input(table) + # Update table description if persist_relation_docs: - table_comment = clean_sql_comment(model["description"]) - updated_table["Description"] = table_comment - updated_table["Parameters"]["comment"] = table_comment - + # Prepare dbt description + clean_table_description = clean_sql_comment(model["description"]) + # Get current description from Glue + glue_table_description = table.get("Description", "") + # Get current description parameter from Glue + glue_table_comment = table["Parameters"].get("comment", "") + # Update description if it's different + if clean_table_description != glue_table_description or clean_table_description != glue_table_comment: + updated_table["Description"] = clean_table_description + updated_table_parameters: Dict[str, str] = dict(updated_table["Parameters"]) + updated_table_parameters["comment"] = clean_table_description + updated_table["Parameters"] = updated_table_parameters + need_udpate_table = True + + # Update column comments if persist_column_docs: + # Process every column for col_obj in updated_table["StorageDescriptor"]["Columns"]: + # Get column description from dbt col_name = col_obj["Name"] - col_comment = model["columns"].get(col_name, {}).get("description") - if col_comment: - col_obj["Comment"] = clean_sql_comment(col_comment) - - glue_client.update_table(DatabaseName=relation.schema, TableInput=updated_table) + if col_name in model["columns"]: + col_comment = model["columns"][col_name]["description"] + # Prepare column description from dbt + clean_col_comment = clean_sql_comment(col_comment) + # Get current column comment from Glue + glue_col_comment = col_obj.get("Comment", "") + # Update column description if it's different + if glue_col_comment != clean_col_comment: + col_obj["Comment"] = clean_col_comment + need_udpate_table = True + + # Update Glue Table only if table/column description is modified. + # It prevents redundant schema version creating after incremental runs. + if need_udpate_table: + glue_client.update_table( + DatabaseName=relation.schema, TableInput=updated_table, SkipArchive=skip_archive_table_version + ) @available def list_schemas(self, database: str) -> List[str]: @@ -833,3 +867,36 @@ def is_list(self, value: Any) -> bool: a list since this is complicated with purely Jinja syntax. """ return isinstance(value, list) + + @staticmethod + def _get_table_input(table: TableTypeDef) -> TableInputTypeDef: + """ + Prepare Glue Table dictionary to be a table_input argument of update_table() method. + + This is needed because update_table() does not accept some read-only fields of table dictionary + returned by get_table() method. + """ + return {k: v for k, v in table.items() if k in TableInputTypeDef.__annotations__} + + @available + def run_query_with_partitions_limit_catching(self, sql: str) -> str: + conn = self.connections.get_thread_connection() + cursor = conn.handle.cursor() + try: + cursor.execute(sql, catch_partitions_limit=True) + except OperationalError as e: + LOGGER.debug(f"CAUGHT EXCEPTION: {e}") + if "TOO_MANY_OPEN_PARTITIONS" in str(e): + return "TOO_MANY_OPEN_PARTITIONS" + raise e + return "SUCCESS" + + @available + def format_partition_keys(self, partition_keys: List[str]) -> str: + return ", ".join([self.format_one_partition_key(k) for k in partition_keys]) + + @available + def format_one_partition_key(self, partition_key: str) -> str: + """Check if partition key uses Iceberg hidden partitioning""" + hidden = re.search(r"^(hour|day|month|year)\((.+)\)", partition_key.lower()) + return f"date_trunc('{hidden.group(1)}', {hidden.group(2)})" if hidden else partition_key.lower() diff --git a/dbt/include/athena/macros/adapters/persist_docs.sql b/dbt/include/athena/macros/adapters/persist_docs.sql index 2a7b8748..78503ba9 100644 --- a/dbt/include/athena/macros/adapters/persist_docs.sql +++ b/dbt/include/athena/macros/adapters/persist_docs.sql @@ -1,10 +1,12 @@ {% macro athena__persist_docs(relation, model, for_relation, for_columns) -%} {% set persist_relation_docs = for_relation and config.persist_relation_docs() and model.description %} {% set persist_column_docs = for_columns and config.persist_column_docs() and model.columns %} - {% if (persist_relation_docs or persist_column_docs) and relation.type != 'view' %} + {% set skip_archive_table_version = not is_incremental() %} + {% if persist_relation_docs or persist_column_docs %} {% do adapter.persist_docs_to_glue(relation, model, persist_relation_docs, - persist_column_docs) %}} + persist_column_docs, + skip_archive_table_version=skip_archive_table_version) %}} {% endif %} {% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql b/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql new file mode 100644 index 00000000..20773841 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql @@ -0,0 +1,52 @@ +{% macro get_partition_batches(sql) -%} + {%- set partitioned_by = config.get('partitioned_by') -%} + {%- set athena_partitions_limit = config.get('partitions_limit', 100) | int -%} + {%- set partitioned_keys = adapter.format_partition_keys(partitioned_by) -%} + {% do log('PARTITIONED KEYS: ' ~ partitioned_keys) %} + + {% call statement('get_partitions', fetch_result=True) %} + select distinct {{ partitioned_keys }} from ({{ sql }}) order by {{ partitioned_keys }}; + {% endcall %} + + {%- set table = load_result('get_partitions').table -%} + {%- set rows = table.rows -%} + {%- set partitions = {} -%} + {% do log('TOTAL PARTITIONS TO PROCESS: ' ~ rows | length) %} + {%- set partitions_batches = [] -%} + + {%- for row in rows -%} + {%- set single_partition = [] -%} + {%- for col in row -%} + + {%- set column_type = adapter.convert_type(table, loop.index0) -%} + {%- if column_type == 'integer' -%} + {%- set value = col | string -%} + {%- elif column_type == 'string' -%} + {%- set value = "'" + col + "'" -%} + {%- elif column_type == 'date' -%} + {%- set value = "DATE'" + col | string + "'" -%} + {%- elif column_type == 'timestamp' -%} + {%- set value = "TIMESTAMP'" + col | string + "'" -%} + {%- else -%} + {%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%} + {%- endif -%} + {%- set partition_key = adapter.format_one_partition_key(partitioned_by[loop.index0]) -%} + {%- do single_partition.append(partition_key + '=' + value) -%} + {%- endfor -%} + + {%- set single_partition_expression = single_partition | join(' and ') -%} + + {%- set batch_number = (loop.index0 / athena_partitions_limit) | int -%} + {% if not batch_number in partitions %} + {% do partitions.update({batch_number: []}) %} + {% endif %} + + {%- do partitions[batch_number].append('(' + single_partition_expression + ')') -%} + {%- if partitions[batch_number] | length == athena_partitions_limit or loop.last -%} + {%- do partitions_batches.append(partitions[batch_number] | join(' or ')) -%} + {%- endif -%} + {%- endfor -%} + + {{ return(partitions_batches) }} + +{%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql b/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql index 53b9bfc0..e8263295 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql @@ -48,3 +48,11 @@ {{ return(run_query(sql)) }} {% endif %} {% endmacro %} + +{% macro alter_relation_rename_column(relation, source_column, target_column, target_column_type) -%} + {% set sql -%} + alter {{ relation.type }} {{ relation.render_pure() }} + change column {{ source_column }} {{ target_column }} {{ ddl_data_type(target_column_type) }} + {%- endset -%} + {{ return(run_query(sql)) }} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/helpers.sql b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql index a965e85c..8c253dcf 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/helpers.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql @@ -22,19 +22,42 @@ {% endmacro %} {% macro incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation, statement_name="main") %} - {% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} - {% if not dest_columns %} + {%- set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%} + {%- if not dest_columns -%} {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {% endif %} + {%- endif -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - insert into {{ target_relation }} ({{ dest_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ tmp_relation }} - ); + {%- set insert_full -%} + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ tmp_relation }} + ); + {%- endset -%} + + {%- set query_result = adapter.run_query_with_partitions_limit_catching(insert_full) -%} + {%- do log('QUERY RESULT: ' ~ query_result) -%} + {%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%} + {% set partitions_batches = get_partition_batches(tmp_relation) %} + {% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %} + {%- for batch in partitions_batches -%} + {%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches|length) -%} + {%- set insert_batch_partitions -%} + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ tmp_relation }} + where {{ batch }} + ); + {%- endset -%} + {%- do run_query(insert_batch_partitions) -%} + {%- endfor -%} + {%- endif -%} + SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}' {%- endmacro %} + {% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %} {%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%} {% call statement('get_partitions', fetch_result=True) %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql index 879aa335..90e51d4a 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql @@ -7,7 +7,7 @@ {% set lf_tags_config = config.get('lf_tags_config') %} {% set lf_grants = config.get('lf_grants') %} - {% set partitioned_by = config.get('partitioned_by', default=none) %} + {% set partitioned_by = config.get('partitioned_by') %} {% set target_relation = this.incorporate(type='table') %} {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(this) %} @@ -24,16 +24,18 @@ {% set to_drop = [] %} {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) -%} + {%- do safe_create_table_as(False, target_relation, sql) -%} + {% set build_sql = "select 'SUCCESSFULLY CREATED TABLE {{ target_relation }} from scratch'" -%} {% elif existing_relation.is_view or should_full_refresh() %} {% do drop_relation(existing_relation) %} - {% set build_sql = create_table_as(False, target_relation, sql) -%} + {%- do safe_create_table_as(False, target_relation, sql) -%} + {% set build_sql = "select 'SUCCESSFULLY RECREATED TABLE {{ target_relation }}'" -%} {% elif partitioned_by is not none and strategy == 'insert_overwrite' %} {% set tmp_relation = make_temp_relation(target_relation) %} {% if tmp_relation is not none %} {% do drop_relation(tmp_relation) %} {% endif %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {%- do safe_create_table_as(True, tmp_relation, sql) -%} {% do delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %} {% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %} {% do to_drop.append(tmp_relation) %} @@ -42,7 +44,7 @@ {% if tmp_relation is not none %} {% do drop_relation(tmp_relation) %} {% endif %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {%- do safe_create_table_as(True, tmp_relation, sql) -%} {% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %} {% do to_drop.append(tmp_relation) %} {% elif strategy == 'merge' and table_type == 'iceberg' %} @@ -67,7 +69,7 @@ {% if tmp_relation is not none %} {% do drop_relation(tmp_relation) %} {% endif %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {%- do safe_create_table_as(True, tmp_relation, sql) -%} {% set build_sql = iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, incremental_predicates, existing_relation, delete_condition) %} {% do to_drop.append(tmp_relation) %} {% endif %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/merge.sql b/dbt/include/athena/macros/materializations/models/incremental/merge.sql index cd06cb03..0bffc5c6 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/merge.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/merge.sql @@ -73,33 +73,67 @@ {%- endfor -%} {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns_wo_keys) -%} {%- set src_cols_csv = src_columns_quoted | join(', ') -%} - merge into {{ target_relation }} as target using {{ tmp_relation }} as src - on ( - {%- for key in unique_key_cols %} - target.{{ key }} = src.{{ key }} {{ "and " if not loop.last }} - {%- endfor %} - ) - {% if incremental_predicates is not none -%} - and ( - {%- for inc_predicate in incremental_predicates %} - {{ inc_predicate }} {{ "and " if not loop.last }} - {%- endfor %} - ) - {%- endif %} - {% if delete_condition is not none -%} - when matched and ({{ delete_condition }}) - then delete - {%- endif %} - when matched - then update set - {%- for col in update_columns %} - {%- if merge_update_columns_rules and col.name in merge_update_columns_rules %} - {{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }} - {%- else -%} - {{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }} - {%- endif -%} - {%- endfor %} - when not matched - then insert ({{ dest_cols_csv }}) - values ({{ src_cols_csv }}); + + {%- set src_part -%} + merge into {{ target_relation }} as target using {{ tmp_relation }} as src + {%- endset -%} + + {%- set merge_part -%} + on ( + {%- for key in unique_key_cols -%} + target.{{ key }} = src.{{ key }} + {{ " and " if not loop.last }} + {%- endfor -%} + {% if incremental_predicates is not none -%} + and ( + {%- for inc_predicate in incremental_predicates %} + {{ inc_predicate }} {{ "and " if not loop.last }} + {%- endfor %} + ) + {%- endif %} + ) + {% if delete_condition is not none -%} + when matched and ({{ delete_condition }}) + then delete + {%- endif %} + when matched + then update set + {%- for col in update_columns %} + {%- if merge_update_columns_rules and col.name in merge_update_columns_rules %} + {{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }} + {%- else -%} + {{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }} + {%- endif -%} + {%- endfor %} + when not matched + then insert ({{ dest_cols_csv }}) + values ({{ src_cols_csv }}) + {%- endset -%} + + {%- set merge_full -%} + {{ src_part }} + {{ merge_part }} + {%- endset -%} + + {%- set query_result = adapter.run_query_with_partitions_limit_catching(merge_full) -%} + {%- do log('QUERY RESULT: ' ~ query_result) -%} + {%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%} + {% set partitions_batches = get_partition_batches(tmp_relation) %} + {% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %} + {%- for batch in partitions_batches -%} + {%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%} + {%- set src_batch_part -%} + merge into {{ target_relation }} as target + using (select * from {{ tmp_relation }} where {{ batch }}) as src + {%- endset -%} + {%- set merge_batch -%} + {{ src_batch_part }} + {{ merge_part }} + {%- endset -%} + {%- do run_query(merge_batch) -%} + {%- endfor -%} + + {%- endif -%} + + SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}' {%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql index 3d9f8137..ef641b70 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql @@ -13,12 +13,32 @@ {%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%} {%- set new_target_types = schema_changes_dict['new_target_types'] -%} {% if table_type == 'iceberg' %} + {# + If last run of alter_column_type was failed on rename tmp column to origin. + Do rename to protect origin column from deletion and losing data. + #} + {% for remove_col in remove_from_target_arr if remove_col.column.endswith('__dbt_alter') %} + {%- set origin_col_name = remove_col.column | replace('__dbt_alter', '') -%} + {% for add_col in add_to_target_arr if add_col.column == origin_col_name %} + {%- do alter_relation_rename_column(target_relation, remove_col.name, add_col.name, add_col.data_type) -%} + {%- do remove_from_target_arr.remove(remove_col) -%} + {%- do add_to_target_arr.remove(add_col) -%} + {% endfor %} + {% endfor %} + {% if add_to_target_arr | length > 0 %} {%- do alter_relation_add_columns(target_relation, add_to_target_arr) -%} {% endif %} {% if remove_from_target_arr | length > 0 %} {%- do alter_relation_drop_columns(target_relation, remove_from_target_arr) -%} {% endif %} + {% if new_target_types != [] %} + {% for ntt in new_target_types %} + {% set column_name = ntt['column_name'] %} + {% set new_type = ntt['new_type'] %} + {% do alter_column_type(target_relation, column_name, new_type) %} + {% endfor %} + {% endif %} {% else %} {%- set replace_with_target_arr = remove_partitions_from_columns(schema_changes_dict['source_columns'], partitioned_by) -%} {% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 or new_target_types | length > 0 %} @@ -35,3 +55,34 @@ {% endset %} {% do log(schema_change_message) %} {% endmacro %} + +{% macro athena__alter_column_type(relation, column_name, new_column_type) -%} + {# + 1. Create a new column (w/ temp name and correct type) + 2. Copy data over to it + 3. Drop the existing column + 4. Rename the new column to existing column + #} + {%- set tmp_column = column_name + '__dbt_alter' -%} + {%- set new_ddl_data_type = ddl_data_type(new_column_type) -%} + + {#- do alter_relation_add_columns(relation, [ tmp_column ]) -#} + {%- set add_column_query -%} + alter {{ relation.type }} {{ relation.render_pure() }} add columns({{ tmp_column }} {{ new_ddl_data_type }}); + {%- endset -%} + {%- do run_query(add_column_query) -%} + + {%- set update_query -%} + update {{ relation.render_pure() }} set {{ tmp_column }} = cast({{ column_name }} as {{ new_column_type }}); + {%- endset -%} + {%- do run_query(update_query) -%} + + {#- do alter_relation_drop_columns(relation, [ column_name ]) -#} + {%- set drop_column_query -%} + alter {{ relation.type }} {{ relation.render_pure() }} drop column {{ column_name }}; + {%- endset -%} + {%- do run_query(drop_column_query) -%} + + {%- do alter_relation_rename_column(relation, tmp_column, column_name, new_column_type) -%} + +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql index af730a30..f64e7543 100644 --- a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql @@ -87,3 +87,42 @@ as {{ sql }} {% endmacro %} + +{% macro create_table_as_with_partitions(temporary, relation, sql) -%} + + {% set partitions_batches = get_partition_batches(sql) %} + {% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %} + + {%- do log('CREATE EMPTY TABLE: ' ~ relation) -%} + {%- set create_empty_table_query -%} + {{ create_table_as(temporary, relation, sql) }} + limit 0 + {%- endset -%} + {%- do run_query(create_empty_table_query) -%} + {%- set dest_columns = adapter.get_columns_in_relation(relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + + {%- for batch in partitions_batches -%} + {%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%} + + {%- set insert_batch_partitions -%} + insert into {{ relation }} ({{ dest_cols_csv }}) + select {{ dest_cols_csv }} + from ({{ sql }}) + where {{ batch }} + {%- endset -%} + + {%- do run_query(insert_batch_partitions) -%} + {%- endfor -%} + + select 'SUCCESSFULLY CREATED TABLE {{ relation }}' + +{%- endmacro %} + +{% macro safe_create_table_as(temporary, relation, sql) -%} + {%- set query_result = adapter.run_query_with_partitions_limit_catching(create_table_as(temporary, relation, sql)) -%} + {%- do log('QUERY RESULT: ' ~ query_result) -%} + {%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%} + {%- do create_table_as_with_partitions(temporary, relation, sql) -%} + {%- endif -%} +{%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/table/table.sql b/dbt/include/athena/macros/materializations/models/table/table.sql index f371c5a7..ee82a633 100644 --- a/dbt/include/athena/macros/materializations/models/table/table.sql +++ b/dbt/include/athena/macros/materializations/models/table/table.sql @@ -49,28 +49,22 @@ {%- endif -%} -- create tmp table - {% call statement('main') -%} - {{ create_table_as(False, tmp_relation, sql) }} - {%- endcall %} + {%- do safe_create_table_as(False, tmp_relation, sql) -%} -- swap table - {%- set swap_table = adapter.swap_table(tmp_relation, - target_relation) -%} + {%- set swap_table = adapter.swap_table(tmp_relation, target_relation) -%} -- delete glue tmp table, do not use drop_relation, as it will remove data of the target table {%- do adapter.delete_from_glue_catalog(tmp_relation) -%} - {% do adapter.expire_glue_table_versions(target_relation, - versions_to_keep, - True) %} + {% do adapter.expire_glue_table_versions(target_relation, versions_to_keep, True) %} + {%- else -%} -- Here we are in the case of non-ha tables or ha tables but in case of full refresh. {%- if old_relation is not none -%} {{ drop_relation(old_relation) }} {%- endif -%} - {%- call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {%- endcall %} + {%- do safe_create_table_as(False, target_relation, sql) -%} {%- endif -%} {{ set_table_classification(target_relation) }} @@ -78,35 +72,42 @@ {%- else -%} {%- if old_relation is none -%} - {%- call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {%- endcall %} + {%- do safe_create_table_as(False, target_relation, sql) -%} {%- else -%} - {%- if tmp_relation is not none -%} - {%- do drop_relation(tmp_relation) -%} - {%- endif -%} - - {%- set old_relation_bkp = make_temp_relation(old_relation, '__bkp') -%} - -- If we have this, it means that at least the first renaming occurred but there was an issue - -- afterwards, therefore we are in weird state. The easiest and cleanest should be to remove - -- the backup relation. It won't have an impact because since we are in the else condition, - -- that means that old relation exists therefore no downtime yet. - {%- if old_relation_bkp is not none -%} - {%- do drop_relation(old_relation_bkp) -%} + {%- if old_relation.is_view -%} + {%- do safe_create_table_as(False, tmp_relation, sql) -%} + {%- do drop_relation(old_relation) -%} + {%- do rename_relation(tmp_relation, target_relation) -%} + {%- else -%} + + {%- if tmp_relation is not none -%} + {%- do drop_relation(tmp_relation) -%} + {%- endif -%} + + {%- set old_relation_bkp = make_temp_relation(old_relation, '__bkp') -%} + -- If we have this, it means that at least the first renaming occurred but there was an issue + -- afterwards, therefore we are in weird state. The easiest and cleanest should be to remove + -- the backup relation. It won't have an impact because since we are in the else condition, + -- that means that old relation exists therefore no downtime yet. + {%- if old_relation_bkp is not none -%} + {%- do drop_relation(old_relation_bkp) -%} + {%- endif -%} + + {%- do safe_create_table_as(False, tmp_relation, sql) -%} + + {{ rename_relation(old_relation, old_relation_bkp) }} + {{ rename_relation(tmp_relation, target_relation) }} + + {{ drop_relation(old_relation_bkp) }} {%- endif -%} - - {%- call statement('main') -%} - {{ create_table_as(False, tmp_relation, sql) }} - {%- endcall -%} - - {{ rename_relation(old_relation, old_relation_bkp) }} - {{ rename_relation(tmp_relation, target_relation) }} - - {{ drop_relation(old_relation_bkp) }} {%- endif -%} {%- endif -%} + {% call statement("main") %} + SELECT 'SUCCESSFULLY CREATED TABLE {{ target_relation }}'; + {% endcall %} + {{ run_hooks(post_hooks) }} {% if lf_tags_config is not none %} diff --git a/dbt/include/athena/macros/materializations/models/view/view.sql b/dbt/include/athena/macros/materializations/models/view/view.sql index 1cf3b337..3b1a4a89 100644 --- a/dbt/include/athena/macros/materializations/models/view/view.sql +++ b/dbt/include/athena/macros/materializations/models/view/view.sql @@ -2,6 +2,7 @@ {% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %} {% set target_relation = this.incorporate(type='view') %} + {% do persist_docs(target_relation, model) %} {% do return(to_return) %} {%- endmaterialization %} diff --git a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql index 208f3e9d..08d41c57 100644 --- a/dbt/include/athena/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql @@ -137,11 +137,6 @@ identifier=target_table, type='table') -%} - - {% if not adapter.check_schema_exists(model.database, model.schema) %} - {% do create_schema(model.database, model.schema) %} - {% endif %} - {%- if not target_relation.is_table -%} {% do exceptions.relation_wrong_type(target_relation, 'table') %} {%- endif -%} diff --git a/dbt/include/athena/macros/utils/ddl_dml_data_type.sql b/dbt/include/athena/macros/utils/ddl_dml_data_type.sql index a97c489c..ec726627 100644 --- a/dbt/include/athena/macros/utils/ddl_dml_data_type.sql +++ b/dbt/include/athena/macros/utils/ddl_dml_data_type.sql @@ -1,7 +1,9 @@ {# Athena has different types between DML and DDL #} {# ref: https://docs.aws.amazon.com/athena/latest/ug/data-types.html #} {% macro ddl_data_type(col_type) -%} - -- transform varchar + {%- set table_type = config.get('table_type', 'hive') -%} + + -- transform varchar {% set re = modules.re %} {% set data_type = re.sub('(?:varchar|character varying)(?:\(\d+\))?', 'string', col_type) %} @@ -15,6 +17,11 @@ {% set data_type = data_type.replace('integer', 'int') -%} {%- endif -%} + -- transform timestamp + {%- if table_type == 'iceberg' and 'timestamp' in data_type -%} + {% set data_type = 'timestamp' -%} + {%- endif -%} + {{ return(data_type) }} {% endmacro %} diff --git a/dev-requirements.txt b/dev-requirements.txt index b6f2d5b4..3dfa7e77 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,10 +1,10 @@ autoflake~=1.7 black~=23.3 -dbt-tests-adapter~=1.5.2 +dbt-tests-adapter~=1.5.4 flake8~=5.0 Flake8-pyproject~=1.2 isort~=5.11 -moto~=4.1.12 +moto~=4.1.14 pre-commit~=2.21 pyparsing~=3.1.0 pytest~=7.4 diff --git a/tests/functional/adapter/test_change_relation_types.py b/tests/functional/adapter/test_change_relation_types.py new file mode 100644 index 00000000..047cac75 --- /dev/null +++ b/tests/functional/adapter/test_change_relation_types.py @@ -0,0 +1,26 @@ +import pytest + +from dbt.tests.adapter.relations.test_changing_relation_type import ( + BaseChangeRelationTypeValidator, +) + + +class TestChangeRelationTypesHive(BaseChangeRelationTypeValidator): + pass + + +class TestChangeRelationTypesIceberg(BaseChangeRelationTypeValidator): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+table_type": "iceberg", + } + } + + def test_changing_materialization_changes_relation_type(self, project): + self._run_and_check_materialization("view") + self._run_and_check_materialization("table") + self._run_and_check_materialization("view") + # skip incremntal that doesn't work with Iceberg + self._run_and_check_materialization("table", extra_args=["--full-refresh"]) diff --git a/tests/unit/test_connection_manager.py b/tests/unit/test_connection_manager.py index a0ede751..c37a4792 100644 --- a/tests/unit/test_connection_manager.py +++ b/tests/unit/test_connection_manager.py @@ -4,7 +4,7 @@ from pyathena.model import AthenaQueryExecution from dbt.adapters.athena import AthenaConnectionManager -from dbt.contracts.connection import AdapterResponse +from dbt.adapters.athena.connections import AthenaAdapterResponse class TestAthenaConnectionManager: @@ -19,11 +19,13 @@ def test_get_response(self, state, result): cursor = mock.MagicMock() cursor.rowcount = 1 cursor.state = state + cursor.data_scanned_in_bytes = 123 cm = AthenaConnectionManager(mock.MagicMock()) response = cm.get_response(cursor) - assert isinstance(response, AdapterResponse) + assert isinstance(response, AthenaAdapterResponse) assert response.code == result assert response.rows_affected == 1 + assert response.data_scanned_in_bytes == 123 def test_data_type_code_to_name(self): cm = AthenaConnectionManager(mock.MagicMock())