Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/pip/pydantic-gte-1.10-and-lt-3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
nicor88 authored Aug 15, 2023
2 parents e28099a + ea3cd1d commit f4feda6
Show file tree
Hide file tree
Showing 18 changed files with 437 additions and 111 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
* Does **not** support the use of `unique_key`
* Supports [snapshots][snapshots]
* Does not support [Python models][python-models]
* Does not support [persist docs][persist-docs] for views

[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds
[incremental]: https://docs.getdbt.com/docs/build/incremental-models
Expand Down
23 changes: 20 additions & 3 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
logger = AdapterLogger("Athena")


@dataclass
class AthenaAdapterResponse(AdapterResponse):
data_scanned_in_bytes: Optional[int] = None


@dataclass
class AthenaCredentials(Credentials):
s3_staging_dir: str
Expand Down Expand Up @@ -127,6 +132,7 @@ def execute( # type: ignore
endpoint_url: Optional[str] = None,
cache_size: int = 0,
cache_expiration_time: int = 0,
catch_partitions_limit: bool = False,
**kwargs,
):
def inner() -> AthenaCursor:
Expand All @@ -153,7 +159,12 @@ def inner() -> AthenaCursor:
return self

retry = tenacity.Retrying(
retry=retry_if_exception(lambda _: True),
# No need to retry if TOO_MANY_OPEN_PARTITIONS occurs.
# Otherwise, Athena throws ICEBERG_FILESYSTEM_ERROR after retry,
# because not all files are removed immediately after first try to create table
retry=retry_if_exception(
lambda e: False if catch_partitions_limit and "TOO_MANY_OPEN_PARTITIONS" in str(e) else True
),
stop=stop_after_attempt(self._retry_config.attempt),
wait=wait_exponential(
multiplier=self._retry_config.attempt,
Expand Down Expand Up @@ -198,6 +209,7 @@ def open(cls, connection: Connection) -> Connection:
handle = AthenaConnection(
s3_staging_dir=creds.s3_staging_dir,
endpoint_url=creds.endpoint_url,
catalog_name=creds.database,
schema_name=creds.schema,
work_group=creds.work_group,
cursor_class=AthenaCursor,
Expand All @@ -223,9 +235,14 @@ def open(cls, connection: Connection) -> Connection:
return connection

@classmethod
def get_response(cls, cursor: AthenaCursor) -> AdapterResponse:
def get_response(cls, cursor: AthenaCursor) -> AthenaAdapterResponse:
code = "OK" if cursor.state == AthenaQueryExecution.STATE_SUCCEEDED else "ERROR"
return AdapterResponse(_message=f"{code} {cursor.rowcount}", rows_affected=cursor.rowcount, code=code)
return AthenaAdapterResponse(
_message=f"{code} {cursor.rowcount}",
rows_affected=cursor.rowcount,
code=code,
data_scanned_in_bytes=cursor.data_scanned_in_bytes,
)

def cancel(self, connection: Connection) -> None:
connection.handle.cancel()
Expand Down
103 changes: 85 additions & 18 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import os
import posixpath as path
import re
import tempfile
from itertools import chain
from textwrap import dedent
Expand All @@ -15,9 +16,11 @@
from mypy_boto3_glue.type_defs import (
ColumnTypeDef,
GetTableResponseTypeDef,
TableInputTypeDef,
TableTypeDef,
TableVersionTypeDef,
)
from pyathena.error import OperationalError

from dbt.adapters.athena import AthenaConnectionManager
from dbt.adapters.athena.column import AthenaColumn
Expand Down Expand Up @@ -636,35 +639,66 @@ def persist_docs_to_glue(
model: Dict[str, Any],
persist_relation_docs: bool = False,
persist_column_docs: bool = False,
skip_archive_table_version: bool = False,
) -> None:
"""Save model/columns description to Glue Table metadata.
:param skip_archive_table_version: if True, current table version will not be archived before creating new one.
The purpose is to avoid creating redundant table version if it already was created during the same dbt run
after CREATE OR REPLACE VIEW or ALTER TABLE statements.
Every dbt run should create not more than one table version.
"""
conn = self.connections.get_thread_connection()
client = conn.handle

with boto3_client_lock:
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())

table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name).get("Table")
updated_table = {
"Name": table["Name"],
"StorageDescriptor": table["StorageDescriptor"],
"PartitionKeys": table.get("PartitionKeys", []),
"TableType": table["TableType"],
"Parameters": table.get("Parameters", {}),
"Description": table.get("Description", ""),
}
# By default, there is no need to update Glue Table
need_udpate_table = False
# Get Table from Glue
table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name)["Table"]
# Prepare new version of Glue Table picking up significant fields
updated_table = self._get_table_input(table)
# Update table description
if persist_relation_docs:
table_comment = clean_sql_comment(model["description"])
updated_table["Description"] = table_comment
updated_table["Parameters"]["comment"] = table_comment

# Prepare dbt description
clean_table_description = clean_sql_comment(model["description"])
# Get current description from Glue
glue_table_description = table.get("Description", "")
# Get current description parameter from Glue
glue_table_comment = table["Parameters"].get("comment", "")
# Update description if it's different
if clean_table_description != glue_table_description or clean_table_description != glue_table_comment:
updated_table["Description"] = clean_table_description
updated_table_parameters: Dict[str, str] = dict(updated_table["Parameters"])
updated_table_parameters["comment"] = clean_table_description
updated_table["Parameters"] = updated_table_parameters
need_udpate_table = True

# Update column comments
if persist_column_docs:
# Process every column
for col_obj in updated_table["StorageDescriptor"]["Columns"]:
# Get column description from dbt
col_name = col_obj["Name"]
col_comment = model["columns"].get(col_name, {}).get("description")
if col_comment:
col_obj["Comment"] = clean_sql_comment(col_comment)

glue_client.update_table(DatabaseName=relation.schema, TableInput=updated_table)
if col_name in model["columns"]:
col_comment = model["columns"][col_name]["description"]
# Prepare column description from dbt
clean_col_comment = clean_sql_comment(col_comment)
# Get current column comment from Glue
glue_col_comment = col_obj.get("Comment", "")
# Update column description if it's different
if glue_col_comment != clean_col_comment:
col_obj["Comment"] = clean_col_comment
need_udpate_table = True

# Update Glue Table only if table/column description is modified.
# It prevents redundant schema version creating after incremental runs.
if need_udpate_table:
glue_client.update_table(
DatabaseName=relation.schema, TableInput=updated_table, SkipArchive=skip_archive_table_version
)

@available
def list_schemas(self, database: str) -> List[str]:
Expand Down Expand Up @@ -833,3 +867,36 @@ def is_list(self, value: Any) -> bool:
a list since this is complicated with purely Jinja syntax.
"""
return isinstance(value, list)

@staticmethod
def _get_table_input(table: TableTypeDef) -> TableInputTypeDef:
"""
Prepare Glue Table dictionary to be a table_input argument of update_table() method.
This is needed because update_table() does not accept some read-only fields of table dictionary
returned by get_table() method.
"""
return {k: v for k, v in table.items() if k in TableInputTypeDef.__annotations__}

@available
def run_query_with_partitions_limit_catching(self, sql: str) -> str:
conn = self.connections.get_thread_connection()
cursor = conn.handle.cursor()
try:
cursor.execute(sql, catch_partitions_limit=True)
except OperationalError as e:
LOGGER.debug(f"CAUGHT EXCEPTION: {e}")
if "TOO_MANY_OPEN_PARTITIONS" in str(e):
return "TOO_MANY_OPEN_PARTITIONS"
raise e
return "SUCCESS"

@available
def format_partition_keys(self, partition_keys: List[str]) -> str:
return ", ".join([self.format_one_partition_key(k) for k in partition_keys])

@available
def format_one_partition_key(self, partition_key: str) -> str:
"""Check if partition key uses Iceberg hidden partitioning"""
hidden = re.search(r"^(hour|day|month|year)\((.+)\)", partition_key.lower())
return f"date_trunc('{hidden.group(1)}', {hidden.group(2)})" if hidden else partition_key.lower()
6 changes: 4 additions & 2 deletions dbt/include/athena/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{% macro athena__persist_docs(relation, model, for_relation, for_columns) -%}
{% set persist_relation_docs = for_relation and config.persist_relation_docs() and model.description %}
{% set persist_column_docs = for_columns and config.persist_column_docs() and model.columns %}
{% if (persist_relation_docs or persist_column_docs) and relation.type != 'view' %}
{% set skip_archive_table_version = not is_incremental() %}
{% if persist_relation_docs or persist_column_docs %}
{% do adapter.persist_docs_to_glue(relation,
model,
persist_relation_docs,
persist_column_docs) %}}
persist_column_docs,
skip_archive_table_version=skip_archive_table_version) %}}
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{% macro get_partition_batches(sql) -%}
{%- set partitioned_by = config.get('partitioned_by') -%}
{%- set athena_partitions_limit = config.get('partitions_limit', 100) | int -%}
{%- set partitioned_keys = adapter.format_partition_keys(partitioned_by) -%}
{% do log('PARTITIONED KEYS: ' ~ partitioned_keys) %}

{% call statement('get_partitions', fetch_result=True) %}
select distinct {{ partitioned_keys }} from ({{ sql }}) order by {{ partitioned_keys }};
{% endcall %}

{%- set table = load_result('get_partitions').table -%}
{%- set rows = table.rows -%}
{%- set partitions = {} -%}
{% do log('TOTAL PARTITIONS TO PROCESS: ' ~ rows | length) %}
{%- set partitions_batches = [] -%}

{%- for row in rows -%}
{%- set single_partition = [] -%}
{%- for col in row -%}

{%- set column_type = adapter.convert_type(table, loop.index0) -%}
{%- if column_type == 'integer' -%}
{%- set value = col | string -%}
{%- elif column_type == 'string' -%}
{%- set value = "'" + col + "'" -%}
{%- elif column_type == 'date' -%}
{%- set value = "DATE'" + col | string + "'" -%}
{%- elif column_type == 'timestamp' -%}
{%- set value = "TIMESTAMP'" + col | string + "'" -%}
{%- else -%}
{%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%}
{%- endif -%}
{%- set partition_key = adapter.format_one_partition_key(partitioned_by[loop.index0]) -%}
{%- do single_partition.append(partition_key + '=' + value) -%}
{%- endfor -%}

{%- set single_partition_expression = single_partition | join(' and ') -%}

{%- set batch_number = (loop.index0 / athena_partitions_limit) | int -%}
{% if not batch_number in partitions %}
{% do partitions.update({batch_number: []}) %}
{% endif %}

{%- do partitions[batch_number].append('(' + single_partition_expression + ')') -%}
{%- if partitions[batch_number] | length == athena_partitions_limit or loop.last -%}
{%- do partitions_batches.append(partitions[batch_number] | join(' or ')) -%}
{%- endif -%}
{%- endfor -%}

{{ return(partitions_batches) }}

{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@
{{ return(run_query(sql)) }}
{% endif %}
{% endmacro %}

{% macro alter_relation_rename_column(relation, source_column, target_column, target_column_type) -%}
{% set sql -%}
alter {{ relation.type }} {{ relation.render_pure() }}
change column {{ source_column }} {{ target_column }} {{ ddl_data_type(target_column_type) }}
{%- endset -%}
{{ return(run_query(sql)) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,42 @@
{% endmacro %}

{% macro incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation, statement_name="main") %}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% if not dest_columns %}
{%- set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
{%- if not dest_columns -%}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{% endif %}
{%- endif -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{%- set insert_full -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{%- endset -%}

{%- set query_result = adapter.run_query_with_partitions_limit_catching(insert_full) -%}
{%- do log('QUERY RESULT: ' ~ query_result) -%}
{%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%}
{% set partitions_batches = get_partition_batches(tmp_relation) %}
{% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %}
{%- for batch in partitions_batches -%}
{%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches|length) -%}
{%- set insert_batch_partitions -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
where {{ batch }}
);
{%- endset -%}
{%- do run_query(insert_batch_partitions) -%}
{%- endfor -%}
{%- endif -%}
SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}'
{%- endmacro %}


{% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
{%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%}
{% call statement('get_partitions', fetch_result=True) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

{% set lf_tags_config = config.get('lf_tags_config') %}
{% set lf_grants = config.get('lf_grants') %}
{% set partitioned_by = config.get('partitioned_by', default=none) %}
{% set partitioned_by = config.get('partitioned_by') %}
{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
Expand All @@ -24,16 +24,18 @@

{% set to_drop = [] %}
{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) -%}
{%- do safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select 'SUCCESSFULLY CREATED TABLE {{ target_relation }} from scratch'" -%}
{% elif existing_relation.is_view or should_full_refresh() %}
{% do drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) -%}
{%- do safe_create_table_as(False, target_relation, sql) -%}
{% set build_sql = "select 'SUCCESSFULLY RECREATED TABLE {{ target_relation }}'" -%}
{% elif partitioned_by is not none and strategy == 'insert_overwrite' %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{%- do safe_create_table_as(True, tmp_relation, sql) -%}
{% do delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
{% do to_drop.append(tmp_relation) %}
Expand All @@ -42,7 +44,7 @@
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{%- do safe_create_table_as(True, tmp_relation, sql) -%}
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
{% do to_drop.append(tmp_relation) %}
{% elif strategy == 'merge' and table_type == 'iceberg' %}
Expand All @@ -67,7 +69,7 @@
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
{% endif %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{%- do safe_create_table_as(True, tmp_relation, sql) -%}
{% set build_sql = iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, incremental_predicates, existing_relation, delete_condition) %}
{% do to_drop.append(tmp_relation) %}
{% endif %}
Expand Down
Loading

0 comments on commit f4feda6

Please sign in to comment.