From f190664e868811bd6c962cefdeb550f648019215 Mon Sep 17 00:00:00 2001 From: brunofaustino Date: Fri, 4 Aug 2023 04:40:44 -0300 Subject: [PATCH 1/2] feat: add catalog_name param to AthenaConnection handle (#358) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jérémy Guiselin <9251353+Jrmyy@users.noreply.github.com> --- dbt/adapters/athena/connections.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index 40d4c855..67e2e71e 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -203,6 +203,7 @@ def open(cls, connection: Connection) -> Connection: handle = AthenaConnection( s3_staging_dir=creds.s3_staging_dir, endpoint_url=creds.endpoint_url, + catalog_name=creds.database, schema_name=creds.schema, work_group=creds.work_group, cursor_class=AthenaCursor, From 8d2a80c3ce7fb5c21111673ed194385f35b37fce Mon Sep 17 00:00:00 2001 From: Pavel Roslovets Date: Fri, 4 Aug 2023 09:52:11 +0200 Subject: [PATCH 2/2] fix: enable persist_docs for views (#337) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jérémy Guiselin <9251353+Jrmyy@users.noreply.github.com> Co-authored-by: Serhii Dimchenko <39801237+svdimchenko@users.noreply.github.com> --- README.md | 1 - dbt/adapters/athena/impl.py | 78 ++++++++++++++----- .../athena/macros/adapters/persist_docs.sql | 6 +- .../materializations/models/view/view.sql | 1 + 4 files changed, 65 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 035ed10a..ccc82e72 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,6 @@ * Does **not** support the use of `unique_key` * Supports [snapshots][snapshots] * Does not support [Python models][python-models] -* Does not support [persist docs][persist-docs] for views [seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds [incremental]: https://docs.getdbt.com/docs/build/incremental-models diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 9ef979ec..07a82769 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -15,6 +15,7 @@ from mypy_boto3_glue.type_defs import ( ColumnTypeDef, GetTableResponseTypeDef, + TableInputTypeDef, TableTypeDef, TableVersionTypeDef, ) @@ -636,35 +637,66 @@ def persist_docs_to_glue( model: Dict[str, Any], persist_relation_docs: bool = False, persist_column_docs: bool = False, + skip_archive_table_version: bool = False, ) -> None: + """Save model/columns description to Glue Table metadata. + + :param skip_archive_table_version: if True, current table version will not be archived before creating new one. + The purpose is to avoid creating redundant table version if it already was created during the same dbt run + after CREATE OR REPLACE VIEW or ALTER TABLE statements. + Every dbt run should create not more than one table version. + """ conn = self.connections.get_thread_connection() client = conn.handle with boto3_client_lock: glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config()) - table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name).get("Table") - updated_table = { - "Name": table["Name"], - "StorageDescriptor": table["StorageDescriptor"], - "PartitionKeys": table.get("PartitionKeys", []), - "TableType": table["TableType"], - "Parameters": table.get("Parameters", {}), - "Description": table.get("Description", ""), - } + # By default, there is no need to update Glue Table + need_udpate_table = False + # Get Table from Glue + table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name)["Table"] + # Prepare new version of Glue Table picking up significant fields + updated_table = self._get_table_input(table) + # Update table description if persist_relation_docs: - table_comment = clean_sql_comment(model["description"]) - updated_table["Description"] = table_comment - updated_table["Parameters"]["comment"] = table_comment - + # Prepare dbt description + clean_table_description = clean_sql_comment(model["description"]) + # Get current description from Glue + glue_table_description = table.get("Description", "") + # Get current description parameter from Glue + glue_table_comment = table["Parameters"].get("comment", "") + # Update description if it's different + if clean_table_description != glue_table_description or clean_table_description != glue_table_comment: + updated_table["Description"] = clean_table_description + updated_table_parameters: Dict[str, str] = dict(updated_table["Parameters"]) + updated_table_parameters["comment"] = clean_table_description + updated_table["Parameters"] = updated_table_parameters + need_udpate_table = True + + # Update column comments if persist_column_docs: + # Process every column for col_obj in updated_table["StorageDescriptor"]["Columns"]: + # Get column description from dbt col_name = col_obj["Name"] - col_comment = model["columns"].get(col_name, {}).get("description") - if col_comment: - col_obj["Comment"] = clean_sql_comment(col_comment) - - glue_client.update_table(DatabaseName=relation.schema, TableInput=updated_table) + if col_name in model["columns"]: + col_comment = model["columns"][col_name]["description"] + # Prepare column description from dbt + clean_col_comment = clean_sql_comment(col_comment) + # Get current column comment from Glue + glue_col_comment = col_obj.get("Comment", "") + # Update column description if it's different + if glue_col_comment != clean_col_comment: + col_obj["Comment"] = clean_col_comment + need_udpate_table = True + + # Update Glue Table only if table/column description is modified. + # It prevents redundant schema version creating after incremental runs. + if need_udpate_table: + glue_client.update_table( + DatabaseName=relation.schema, TableInput=updated_table, SkipArchive=skip_archive_table_version + ) @available def list_schemas(self, database: str) -> List[str]: @@ -833,3 +865,13 @@ def is_list(self, value: Any) -> bool: a list since this is complicated with purely Jinja syntax. """ return isinstance(value, list) + + @staticmethod + def _get_table_input(table: TableTypeDef) -> TableInputTypeDef: + """ + Prepare Glue Table dictionary to be a table_input argument of update_table() method. + + This is needed because update_table() does not accept some read-only fields of table dictionary + returned by get_table() method. + """ + return {k: v for k, v in table.items() if k in TableInputTypeDef.__annotations__} diff --git a/dbt/include/athena/macros/adapters/persist_docs.sql b/dbt/include/athena/macros/adapters/persist_docs.sql index 2a7b8748..78503ba9 100644 --- a/dbt/include/athena/macros/adapters/persist_docs.sql +++ b/dbt/include/athena/macros/adapters/persist_docs.sql @@ -1,10 +1,12 @@ {% macro athena__persist_docs(relation, model, for_relation, for_columns) -%} {% set persist_relation_docs = for_relation and config.persist_relation_docs() and model.description %} {% set persist_column_docs = for_columns and config.persist_column_docs() and model.columns %} - {% if (persist_relation_docs or persist_column_docs) and relation.type != 'view' %} + {% set skip_archive_table_version = not is_incremental() %} + {% if persist_relation_docs or persist_column_docs %} {% do adapter.persist_docs_to_glue(relation, model, persist_relation_docs, - persist_column_docs) %}} + persist_column_docs, + skip_archive_table_version=skip_archive_table_version) %}} {% endif %} {% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/view/view.sql b/dbt/include/athena/macros/materializations/models/view/view.sql index 1cf3b337..3b1a4a89 100644 --- a/dbt/include/athena/macros/materializations/models/view/view.sql +++ b/dbt/include/athena/macros/materializations/models/view/view.sql @@ -2,6 +2,7 @@ {% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %} {% set target_relation = this.incorporate(type='view') %} + {% do persist_docs(target_relation, model) %} {% do return(to_return) %} {%- endmaterialization %}