Skip to content

Commit

Permalink
Merge branch 'main' into feat/athena-partitions-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
svdimchenko committed Aug 4, 2023
2 parents 829a9ff + 8d2a80c commit 96c0aa3
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 21 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
* Does **not** support the use of `unique_key`
* Supports [snapshots][snapshots]
* Does not support [Python models][python-models]
* Does not support [persist docs][persist-docs] for views

[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds
[incremental]: https://docs.getdbt.com/docs/build/incremental-models
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def open(cls, connection: Connection) -> Connection:
handle = AthenaConnection(
s3_staging_dir=creds.s3_staging_dir,
endpoint_url=creds.endpoint_url,
catalog_name=creds.database,
schema_name=creds.schema,
work_group=creds.work_group,
cursor_class=AthenaCursor,
Expand Down
78 changes: 60 additions & 18 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from mypy_boto3_glue.type_defs import (
ColumnTypeDef,
GetTableResponseTypeDef,
TableInputTypeDef,
TableTypeDef,
TableVersionTypeDef,
)
Expand Down Expand Up @@ -638,35 +639,66 @@ def persist_docs_to_glue(
model: Dict[str, Any],
persist_relation_docs: bool = False,
persist_column_docs: bool = False,
skip_archive_table_version: bool = False,
) -> None:
"""Save model/columns description to Glue Table metadata.
:param skip_archive_table_version: if True, current table version will not be archived before creating new one.
The purpose is to avoid creating redundant table version if it already was created during the same dbt run
after CREATE OR REPLACE VIEW or ALTER TABLE statements.
Every dbt run should create not more than one table version.
"""
conn = self.connections.get_thread_connection()
client = conn.handle

with boto3_client_lock:
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())

table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name).get("Table")
updated_table = {
"Name": table["Name"],
"StorageDescriptor": table["StorageDescriptor"],
"PartitionKeys": table.get("PartitionKeys", []),
"TableType": table["TableType"],
"Parameters": table.get("Parameters", {}),
"Description": table.get("Description", ""),
}
# By default, there is no need to update Glue Table
need_udpate_table = False
# Get Table from Glue
table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name)["Table"]
# Prepare new version of Glue Table picking up significant fields
updated_table = self._get_table_input(table)
# Update table description
if persist_relation_docs:
table_comment = clean_sql_comment(model["description"])
updated_table["Description"] = table_comment
updated_table["Parameters"]["comment"] = table_comment

# Prepare dbt description
clean_table_description = clean_sql_comment(model["description"])
# Get current description from Glue
glue_table_description = table.get("Description", "")
# Get current description parameter from Glue
glue_table_comment = table["Parameters"].get("comment", "")
# Update description if it's different
if clean_table_description != glue_table_description or clean_table_description != glue_table_comment:
updated_table["Description"] = clean_table_description
updated_table_parameters: Dict[str, str] = dict(updated_table["Parameters"])
updated_table_parameters["comment"] = clean_table_description
updated_table["Parameters"] = updated_table_parameters
need_udpate_table = True

# Update column comments
if persist_column_docs:
# Process every column
for col_obj in updated_table["StorageDescriptor"]["Columns"]:
# Get column description from dbt
col_name = col_obj["Name"]
col_comment = model["columns"].get(col_name, {}).get("description")
if col_comment:
col_obj["Comment"] = clean_sql_comment(col_comment)

glue_client.update_table(DatabaseName=relation.schema, TableInput=updated_table)
if col_name in model["columns"]:
col_comment = model["columns"][col_name]["description"]
# Prepare column description from dbt
clean_col_comment = clean_sql_comment(col_comment)
# Get current column comment from Glue
glue_col_comment = col_obj.get("Comment", "")
# Update column description if it's different
if glue_col_comment != clean_col_comment:
col_obj["Comment"] = clean_col_comment
need_udpate_table = True

# Update Glue Table only if table/column description is modified.
# It prevents redundant schema version creating after incremental runs.
if need_udpate_table:
glue_client.update_table(
DatabaseName=relation.schema, TableInput=updated_table, SkipArchive=skip_archive_table_version
)

@available
def list_schemas(self, database: str) -> List[str]:
Expand Down Expand Up @@ -836,6 +868,16 @@ def is_list(self, value: Any) -> bool:
"""
return isinstance(value, list)

@staticmethod
def _get_table_input(table: TableTypeDef) -> TableInputTypeDef:
"""
Prepare Glue Table dictionary to be a table_input argument of update_table() method.
This is needed because update_table() does not accept some read-only fields of table dictionary
returned by get_table() method.
"""
return {k: v for k, v in table.items() if k in TableInputTypeDef.__annotations__}

@available
def run_query_with_partitions_limit_catching(self, sql: str) -> str:
conn = self.connections.get_thread_connection()
Expand Down
6 changes: 4 additions & 2 deletions dbt/include/athena/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{% macro athena__persist_docs(relation, model, for_relation, for_columns) -%}
{% set persist_relation_docs = for_relation and config.persist_relation_docs() and model.description %}
{% set persist_column_docs = for_columns and config.persist_column_docs() and model.columns %}
{% if (persist_relation_docs or persist_column_docs) and relation.type != 'view' %}
{% set skip_archive_table_version = not is_incremental() %}
{% if persist_relation_docs or persist_column_docs %}
{% do adapter.persist_docs_to_glue(relation,
model,
persist_relation_docs,
persist_column_docs) %}}
persist_column_docs,
skip_archive_table_version=skip_archive_table_version) %}}
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %}

{% set target_relation = this.incorporate(type='view') %}
{% do persist_docs(target_relation, model) %}

{% do return(to_return) %}
{%- endmaterialization %}

0 comments on commit 96c0aa3

Please sign in to comment.