diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index 17e65ad5..d7c52ca0 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -1,4 +1,6 @@ import hashlib +import json +import re import time from concurrent.futures.thread import ThreadPoolExecutor from contextlib import contextmanager @@ -237,13 +239,35 @@ def open(cls, connection: Connection) -> Connection: @classmethod def get_response(cls, cursor: AthenaCursor) -> AthenaAdapterResponse: code = "OK" if cursor.state == AthenaQueryExecution.STATE_SUCCEEDED else "ERROR" + rowcount, data_scanned_in_bytes = cls.process_query_stats(cursor) return AthenaAdapterResponse( - _message=f"{code} {cursor.rowcount}", - rows_affected=cursor.rowcount, + _message=f"{code} {rowcount}", + rows_affected=rowcount, code=code, - data_scanned_in_bytes=cursor.data_scanned_in_bytes, + data_scanned_in_bytes=data_scanned_in_bytes, ) + @staticmethod + def process_query_stats(cursor: AthenaCursor) -> Tuple[int, int]: + """ + Helper function to parse query statistics from SELECT statements. + The function looks for all statements that contains rowcount or data_scanned_in_bytes, + then strip the SELECT statements, and pick the value between curly brackets. + """ + if all(map(cursor.query.__contains__, ["rowcount", "data_scanned_in_bytes"])): + try: + query_split = cursor.query.lower().split("select")[-1] + # query statistics are in the format {"rowcount":1, "data_scanned_in_bytes": 3} + # the following statement extract the content between { and } + query_stats = re.search("{(.*)}", query_split) + if query_stats: + stats = json.loads("{" + query_stats.group(1) + "}") + return stats.get("rowcount", -1), stats.get("data_scanned_in_bytes", 0) + except Exception as err: + logger.debug(f"There was an error parsing query stats {err}") + return -1, 0 + return cursor.rowcount, cursor.data_scanned_in_bytes + def cancel(self, connection: Connection) -> None: connection.handle.cancel() diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 676ec2d1..96b49178 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -944,7 +944,7 @@ def run_query_with_partitions_limit_catching(self, sql: str) -> str: if "TOO_MANY_OPEN_PARTITIONS" in str(e): return "TOO_MANY_OPEN_PARTITIONS" raise e - return "SUCCESS" + return f'{{"rowcount":{cursor.rowcount},"data_scanned_in_bytes":{cursor.data_scanned_in_bytes}}}' @available def format_partition_keys(self, partition_keys: List[str]) -> str: diff --git a/dbt/include/athena/macros/materializations/models/incremental/helpers.sql b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql index 8c253dcf..76c2ed73 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/helpers.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql @@ -54,7 +54,7 @@ {%- do run_query(insert_batch_partitions) -%} {%- endfor -%} {%- endif -%} - SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}' + SELECT '{{query_result}}' {%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql index 90e51d4a..b42c7cb9 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql @@ -24,18 +24,18 @@ {% set to_drop = [] %} {% if existing_relation is none %} - {%- do safe_create_table_as(False, target_relation, sql) -%} - {% set build_sql = "select 'SUCCESSFULLY CREATED TABLE {{ target_relation }} from scratch'" -%} + {% set query_result = safe_create_table_as(False, target_relation, sql) -%} + {% set build_sql = "select '{{ query_result }}'" -%} {% elif existing_relation.is_view or should_full_refresh() %} {% do drop_relation(existing_relation) %} - {%- do safe_create_table_as(False, target_relation, sql) -%} - {% set build_sql = "select 'SUCCESSFULLY RECREATED TABLE {{ target_relation }}'" -%} + {% set query_result = safe_create_table_as(False, target_relation, sql) -%} + {% set build_sql = "select '{{ query_result }}'" -%} {% elif partitioned_by is not none and strategy == 'insert_overwrite' %} {% set tmp_relation = make_temp_relation(target_relation) %} {% if tmp_relation is not none %} {% do drop_relation(tmp_relation) %} {% endif %} - {%- do safe_create_table_as(True, tmp_relation, sql) -%} + {% set query_result = safe_create_table_as(True, tmp_relation, sql) -%} {% do delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %} {% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %} {% do to_drop.append(tmp_relation) %} @@ -44,7 +44,7 @@ {% if tmp_relation is not none %} {% do drop_relation(tmp_relation) %} {% endif %} - {%- do safe_create_table_as(True, tmp_relation, sql) -%} + {% set query_result = safe_create_table_as(True, tmp_relation, sql) -%} {% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %} {% do to_drop.append(tmp_relation) %} {% elif strategy == 'merge' and table_type == 'iceberg' %} @@ -69,7 +69,7 @@ {% if tmp_relation is not none %} {% do drop_relation(tmp_relation) %} {% endif %} - {%- do safe_create_table_as(True, tmp_relation, sql) -%} + {% set query_result = safe_create_table_as(True, tmp_relation, sql) -%} {% set build_sql = iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, incremental_predicates, existing_relation, delete_condition) %} {% do to_drop.append(tmp_relation) %} {% endif %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/merge.sql b/dbt/include/athena/macros/materializations/models/incremental/merge.sql index 0bffc5c6..07d8b886 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/merge.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/merge.sql @@ -132,8 +132,7 @@ {%- endset -%} {%- do run_query(merge_batch) -%} {%- endfor -%} - {%- endif -%} - SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}' + SELECT '{{query_result}}' {%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql index f64e7543..878a0378 100644 --- a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql @@ -124,5 +124,7 @@ {%- do log('QUERY RESULT: ' ~ query_result) -%} {%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%} {%- do create_table_as_with_partitions(temporary, relation, sql) -%} + {%- set query_result = '{{ relation }} with many partitions created' -%} {%- endif -%} + {{ return(query_result) }} {%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/table/table.sql b/dbt/include/athena/macros/materializations/models/table/table.sql index ee82a633..989bf63b 100644 --- a/dbt/include/athena/macros/materializations/models/table/table.sql +++ b/dbt/include/athena/macros/materializations/models/table/table.sql @@ -49,7 +49,7 @@ {%- endif -%} -- create tmp table - {%- do safe_create_table_as(False, tmp_relation, sql) -%} + {%- set query_result = safe_create_table_as(False, tmp_relation, sql) -%} -- swap table {%- set swap_table = adapter.swap_table(tmp_relation, target_relation) -%} @@ -64,7 +64,7 @@ {%- if old_relation is not none -%} {{ drop_relation(old_relation) }} {%- endif -%} - {%- do safe_create_table_as(False, target_relation, sql) -%} + {%- set query_result = safe_create_table_as(False, target_relation, sql) -%} {%- endif -%} {{ set_table_classification(target_relation) }} @@ -72,10 +72,10 @@ {%- else -%} {%- if old_relation is none -%} - {%- do safe_create_table_as(False, target_relation, sql) -%} + {%- set query_result = safe_create_table_as(False, target_relation, sql) -%} {%- else -%} {%- if old_relation.is_view -%} - {%- do safe_create_table_as(False, tmp_relation, sql) -%} + {%- set query_result = safe_create_table_as(False, tmp_relation, sql) -%} {%- do drop_relation(old_relation) -%} {%- do rename_relation(tmp_relation, target_relation) -%} {%- else -%} @@ -93,7 +93,7 @@ {%- do drop_relation(old_relation_bkp) -%} {%- endif -%} - {%- do safe_create_table_as(False, tmp_relation, sql) -%} + {% set query_result = safe_create_table_as(False, tmp_relation, sql) %} {{ rename_relation(old_relation, old_relation_bkp) }} {{ rename_relation(tmp_relation, target_relation) }} @@ -105,7 +105,7 @@ {%- endif -%} {% call statement("main") %} - SELECT 'SUCCESSFULLY CREATED TABLE {{ target_relation }}'; + SELECT '{{ query_result }}'; {% endcall %} {{ run_hooks(post_hooks) }} diff --git a/tests/functional/adapter/test_partitions.py b/tests/functional/adapter/test_partitions.py new file mode 100644 index 00000000..68e639e6 --- /dev/null +++ b/tests/functional/adapter/test_partitions.py @@ -0,0 +1,127 @@ +import pytest + +from dbt.contracts.results import RunStatus +from dbt.tests.util import run_dbt + +# this query generates 212 records +test_partitions_model_sql = """ +select + random() as rnd, + cast(date_column as date) as date_column, + doy(date_column) as doy +from ( + values ( + sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day) + ) +) as t1(date_array) +cross join unnest(date_array) as t2(date_column) +""" + + +class TestHiveTablePartitions: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"+table_type": "hive", "+materialized": "table", "+partitioned_by": ["date_column", "doy"]}} + + @pytest.fixture(scope="class") + def models(self): + return { + "test_hive_partitions.sql": test_partitions_model_sql, + } + + def test__check_incremental_run_with_partitions(self, project): + relation_name = "test_hive_partitions" + model_run_result_row_count_query = "select count(*) as records from {}.{}".format( + project.test_schema, relation_name + ) + + first_model_run = run_dbt(["run", "--select", relation_name]) + first_model_run_result = first_model_run.results[0] + + # check that the model run successfully + assert first_model_run_result.status == RunStatus.Success + + records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] + + assert records_count_first_run == 212 + + +class TestIcebergTablePartitions: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+table_type": "iceberg", + "+materialized": "table", + "+partitioned_by": ["DAY(date_column)", "doy"], + } + } + + @pytest.fixture(scope="class") + def models(self): + return { + "test_iceberg_partitions.sql": test_partitions_model_sql, + } + + def test__check_incremental_run_with_partitions(self, project): + relation_name = "test_iceberg_partitions" + model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}" + + first_model_run = run_dbt(["run", "--select", relation_name]) + first_model_run_result = first_model_run.results[0] + + # check that the model run successfully + assert first_model_run_result.status == RunStatus.Success + + records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] + + assert records_count_first_run == 212 + + +class TestIcebergIncrementalPartitions: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+table_type": "iceberg", + "+materialized": "incremental", + "+incremental_strategy": "merge", + "+unique_key": "doy", + "+partitioned_by": ["DAY(date_column)", "doy"], + } + } + + @pytest.fixture(scope="class") + def models(self): + return { + "test_iceberg_partitions_incremental.sql": test_partitions_model_sql, + } + + def test__check_incremental_run_with_partitions(self, project): + """ + Check that the incremental run works with iceberg and partitioned datasets + """ + + relation_name = "test_iceberg_partitions_incremental" + model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}" + + first_model_run = run_dbt(["run", "--select", relation_name, "--full-refresh"]) + first_model_run_result = first_model_run.results[0] + + # check that the model run successfully + assert first_model_run_result.status == RunStatus.Success + + records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] + + assert records_count_first_run == 212 + + incremental_model_run = run_dbt(["run", "--select", relation_name]) + + incremental_model_run_result = incremental_model_run.results[0] + + # check that the model run successfully after incremental run + assert incremental_model_run_result.status == RunStatus.Success + + incremental_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0] + + assert incremental_records_count == 212