Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add back rowcounts and byte scanned to dbt cli and run_results.json #375

Merged
merged 9 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import hashlib
import json
import re
import time
from concurrent.futures.thread import ThreadPoolExecutor
from contextlib import contextmanager
Expand Down Expand Up @@ -237,13 +239,35 @@ def open(cls, connection: Connection) -> Connection:
@classmethod
def get_response(cls, cursor: AthenaCursor) -> AthenaAdapterResponse:
code = "OK" if cursor.state == AthenaQueryExecution.STATE_SUCCEEDED else "ERROR"
rowcount, data_scanned_in_bytes = cls.process_query_stats(cursor)
return AthenaAdapterResponse(
_message=f"{code} {cursor.rowcount}",
rows_affected=cursor.rowcount,
_message=f"{code} {rowcount}",
rows_affected=rowcount,
code=code,
data_scanned_in_bytes=cursor.data_scanned_in_bytes,
data_scanned_in_bytes=data_scanned_in_bytes,
)

@staticmethod
def process_query_stats(cursor: AthenaCursor) -> Tuple[int, int]:
"""
Helper function to parse query statistics from SELECT statements.
The function looks for all statements that contains rowcount or data_scanned_in_bytes,
then strip the SELECT statements, and pick the value between curly brackets.
"""
if all(map(cursor.query.__contains__, ["rowcount", "data_scanned_in_bytes"])):
try:
query_split = cursor.query.lower().split("select")[-1]
# query statistics are in the format {"rowcount":1, "data_scanned_in_bytes": 3}
# the following statement extract the content between { and }
query_stats = re.search("{(.*)}", query_split)
if query_stats:
stats = json.loads("{" + query_stats.group(1) + "}")
return stats.get("rowcount", -1), stats.get("data_scanned_in_bytes", 0)
except Exception as err:
logger.debug(f"There was an error parsing query stats {err}")
return -1, 0
return cursor.rowcount, cursor.data_scanned_in_bytes

def cancel(self, connection: Connection) -> None:
connection.handle.cancel()

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ def run_query_with_partitions_limit_catching(self, sql: str) -> str:
if "TOO_MANY_OPEN_PARTITIONS" in str(e):
return "TOO_MANY_OPEN_PARTITIONS"
raise e
return "SUCCESS"
return f'{{"rowcount":{cursor.rowcount},"data_scanned_in_bytes":{cursor.data_scanned_in_bytes}}}'

@available
def format_partition_keys(self, partition_keys: List[str]) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
{%- do run_query(insert_batch_partitions) -%}
{%- endfor -%}
{%- endif -%}
SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}'
SELECT '{{query_result}}'
{%- endmacro %}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@

{% set to_drop = [] %}
{% if existing_relation is none %}
{%- do safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select 'SUCCESSFULLY CREATED TABLE {{ target_relation }} from scratch'" -%}
{% set query_result = safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select '{{ query_result }}'" -%}
{% elif existing_relation.is_view or should_full_refresh() %}
{% do drop_relation(existing_relation) %}
{%- do safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select 'SUCCESSFULLY RECREATED TABLE {{ target_relation }}'" -%}
{% set query_result = safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select '{{ query_result }}'" -%}
{% elif partitioned_by is not none and strategy == 'insert_overwrite' %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{%- do safe_create_table_as(True, tmp_relation, sql) -%}
{% set query_result = safe_create_table_as(True, tmp_relation, sql) -%}
{% do delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
{% do to_drop.append(tmp_relation) %}
Expand All @@ -44,7 +44,7 @@
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{%- do safe_create_table_as(True, tmp_relation, sql) -%}
{% set query_result = safe_create_table_as(True, tmp_relation, sql) -%}
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
{% do to_drop.append(tmp_relation) %}
{% elif strategy == 'merge' and table_type == 'iceberg' %}
Expand All @@ -69,7 +69,7 @@
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{%- do safe_create_table_as(True, tmp_relation, sql) -%}
{% set query_result = safe_create_table_as(True, tmp_relation, sql) -%}
{% set build_sql = iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, incremental_predicates, existing_relation, delete_condition) %}
{% do to_drop.append(tmp_relation) %}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@
{%- endset -%}
{%- do run_query(merge_batch) -%}
{%- endfor -%}

{%- endif -%}

SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}'
SELECT '{{query_result}}'
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,7 @@
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{%- do create_table_as_with_partitions(temporary, relation, sql) -%}
{%- set query_result = '{{ relation }} with many partitions created' -%}
{%- endif -%}
{{ return(query_result) }}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
{%- endif -%}

-- create tmp table
{%- do safe_create_table_as(False, tmp_relation, sql) -%}
{%- set query_result = safe_create_table_as(False, tmp_relation, sql) -%}

-- swap table
{%- set swap_table = adapter.swap_table(tmp_relation, target_relation) -%}
Expand All @@ -64,18 +64,18 @@
{%- if old_relation is not none -%}
{{ drop_relation(old_relation) }}
{%- endif -%}
{%- do safe_create_table_as(False, target_relation, sql) -%}
{%- set query_result = safe_create_table_as(False, target_relation, sql) -%}
{%- endif -%}

{{ set_table_classification(target_relation) }}

{%- else -%}

{%- if old_relation is none -%}
{%- do safe_create_table_as(False, target_relation, sql) -%}
{%- set query_result = safe_create_table_as(False, target_relation, sql) -%}
{%- else -%}
{%- if old_relation.is_view -%}
{%- do safe_create_table_as(False, tmp_relation, sql) -%}
{%- set query_result = safe_create_table_as(False, tmp_relation, sql) -%}
{%- do drop_relation(old_relation) -%}
{%- do rename_relation(tmp_relation, target_relation) -%}
{%- else -%}
Expand All @@ -93,7 +93,7 @@
{%- do drop_relation(old_relation_bkp) -%}
{%- endif -%}

{%- do safe_create_table_as(False, tmp_relation, sql) -%}
{% set query_result = safe_create_table_as(False, tmp_relation, sql) %}

{{ rename_relation(old_relation, old_relation_bkp) }}
{{ rename_relation(tmp_relation, target_relation) }}
Expand All @@ -105,7 +105,7 @@
{%- endif -%}

{% call statement("main") %}
SELECT 'SUCCESSFULLY CREATED TABLE {{ target_relation }}';
SELECT '{{ query_result }}';
{% endcall %}

{{ run_hooks(post_hooks) }}
Expand Down
127 changes: 127 additions & 0 deletions tests/functional/adapter/test_partitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import pytest

from dbt.contracts.results import RunStatus
from dbt.tests.util import run_dbt

# this query generates 212 records
test_partitions_model_sql = """
select
random() as rnd,
cast(date_column as date) as date_column,
doy(date_column) as doy
from (
values (
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
)
) as t1(date_array)
cross join unnest(date_array) as t2(date_column)
"""


class TestHiveTablePartitions:
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+table_type": "hive", "+materialized": "table", "+partitioned_by": ["date_column", "doy"]}}

@pytest.fixture(scope="class")
def models(self):
return {
"test_hive_partitions.sql": test_partitions_model_sql,
}

def test__check_incremental_run_with_partitions(self, project):
relation_name = "test_hive_partitions"
model_run_result_row_count_query = "select count(*) as records from {}.{}".format(
project.test_schema, relation_name
)

first_model_run = run_dbt(["run", "--select", relation_name])
first_model_run_result = first_model_run.results[0]

# check that the model run successfully
assert first_model_run_result.status == RunStatus.Success

records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert records_count_first_run == 212


class TestIcebergTablePartitions:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+table_type": "iceberg",
"+materialized": "table",
"+partitioned_by": ["DAY(date_column)", "doy"],
}
}

@pytest.fixture(scope="class")
def models(self):
return {
"test_iceberg_partitions.sql": test_partitions_model_sql,
}

def test__check_incremental_run_with_partitions(self, project):
relation_name = "test_iceberg_partitions"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

first_model_run = run_dbt(["run", "--select", relation_name])
first_model_run_result = first_model_run.results[0]

# check that the model run successfully
assert first_model_run_result.status == RunStatus.Success

records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert records_count_first_run == 212


class TestIcebergIncrementalPartitions:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+table_type": "iceberg",
"+materialized": "incremental",
"+incremental_strategy": "merge",
"+unique_key": "doy",
"+partitioned_by": ["DAY(date_column)", "doy"],
}
}

@pytest.fixture(scope="class")
def models(self):
return {
"test_iceberg_partitions_incremental.sql": test_partitions_model_sql,
}

def test__check_incremental_run_with_partitions(self, project):
"""
Check that the incremental run works with iceberg and partitioned datasets
"""

relation_name = "test_iceberg_partitions_incremental"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

first_model_run = run_dbt(["run", "--select", relation_name, "--full-refresh"])
first_model_run_result = first_model_run.results[0]

# check that the model run successfully
assert first_model_run_result.status == RunStatus.Success

records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert records_count_first_run == 212

incremental_model_run = run_dbt(["run", "--select", relation_name])

incremental_model_run_result = incremental_model_run.results[0]

# check that the model run successfully after incremental run
assert incremental_model_run_result.status == RunStatus.Success

incremental_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert incremental_records_count == 212