diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index e3bc3ae0b..6de132c08 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -300,3 +300,20 @@ def valid_incremental_strategies(self): def debug_query(self): """Override for DebugTask method""" self.execute("select 1 as id") + + @available.parse_none + def stack_tables(self, tables_list: List[agate.Table]) -> agate.Table: + """ + Given a list of agate_tables with the same column names & types + return a single unioned agate table. + """ + non_empty_tables = [table for table in tables_list if len(table.rows) > 0] + + if len(non_empty_tables) == 0: + return tables_list[0] + else: + return ( + agate.TableSet(non_empty_tables, keys=range(len(non_empty_tables))) + .merge() + .exclude(["group"]) + ) diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index ff94abc33..f91839345 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -48,6 +48,14 @@ def is_dynamic_table(self) -> bool: def DynamicTable(cls) -> str: return str(SnowflakeRelationType.DynamicTable) + @property + def is_snowpipe(self) -> bool: + return self.type == SnowflakeRelationType.SnowPipe + + @classproperty + def SnowPipe(cls) -> str: + return str(SnowflakeRelationType.SnowPipe) + @classproperty def get_relation_type(cls) -> Type[SnowflakeRelationType]: return SnowflakeRelationType diff --git a/dbt/adapters/snowflake/relation_configs/policies.py b/dbt/adapters/snowflake/relation_configs/policies.py index 75195f9a3..533c8dffb 100644 --- a/dbt/adapters/snowflake/relation_configs/policies.py +++ b/dbt/adapters/snowflake/relation_configs/policies.py @@ -10,6 +10,8 @@ class SnowflakeRelationType(StrEnum): CTE = "cte" External = "external" DynamicTable = "dynamic_table" + ExternalTable = "external_table" + SnowPipe = "snowpipe" class SnowflakeIncludePolicy(Policy): diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 0bf7b7d1b..44249a4b6 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -124,11 +124,47 @@ {%- set max_total_results = max_results_per_iter * max_iter -%} {%- set sql -%} - show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} + show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}; + + SELECT + "database_name", + "schema_name", + "name", + "kind", + "is_dynamic" + FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())); {%- endset -%} {%- set result = run_query(sql) -%} + {%- set sql_extab -%} + show external tables in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}; + + SELECT + "database_name", + "schema_name", + "name", + 'external_table' as "kind", + 'N' as "is_dynamic" + FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())); + {%- endset -%} + + {%- set result_extab = run_query(sql_extab) -%} + + {%- set sql_pipes -%} + show pipes in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}; + + SELECT + "database_name", + "schema_name", + "name", + 'snowpipe' as "kind", + 'N' as "is_dynamic" + FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())); + {%- endset -%} + + {%- set result_pipes = run_query(sql_pipes) -%} + {%- set n = (result | length) -%} {%- set watermark = namespace(table_name=result.columns[1].values()[-1]) -%} {%- set paginated = namespace(result=[]) -%} @@ -147,8 +183,10 @@ {% endif %} {%- set all_results_array = [result] + paginated.result -%} - {%- set result = result.merge(all_results_array) -%} - {%- do return(result) -%} + + {%- set result_stacked = adapter.stack_tables([result, result_pipes, result_extab]) -%} + + {%- do return(result_stacked) -%} {% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql new file mode 100644 index 000000000..3a59474d0 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -0,0 +1,87 @@ +{% macro snowflake__create_external_table(relation, columns) %} + + {% set snowpipe = config.get('snowpipe') %} + + {% if snowpipe %} + + {{ snowflake_get_build_snowpipe_sql(relation, columns) }} + + {% else %} + + {{ get_create_external_table_sql(relation, columns) }} + + {% endif %} + +{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} +{# This assumes you have already created an external stage #} + + +{% endmacro %} + +{% macro get_create_external_table_sql(relation, columns) %} + + {% set file_format = config.get('file_format') %} + {% set location = config.get('location') %} + {% set partitions = config.get('partitions') %} + {% set partition_map = partitions|map(attribute='name')|join(', ') %} + + {%- set is_csv = is_csv(file_format) -%} + + create or replace external table {{ relation }} + {%- if columns or partitions or infer_schema -%} + ( + {%- if partitions -%}{%- for partition in partitions %} + {{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}} + {%- endfor -%}{%- endif -%} + {%- if not infer_schema -%} + {%- for column in columns %} + {%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} + {%- set column_alias -%} + {%- if 'alias' in column and column.quote -%} + {{adapter.quote(column.alias)}} + {%- elif 'alias' in column -%} + {{column.alias}} + {%- else -%} + {{column_quoted}} + {%- endif -%} + {%- endset %} + {%- set col_expression -%} + {%- if column.expression -%} + {{column.expression}} + {%- else -%} + {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endif -%} + {%- endset %} + {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) + {{- ',' if not loop.last -}} + {% endfor %} + {% else %} + {%- for column in columns_infer %} + {%- set col_expression -%} + {%- set col_id = 'value:' ~ column[0] -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endset %} + {{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}}) + {{- ',' if not loop.last -}} + {% endfor %} + {%- endif -%} + ) + {%- endif -%} + {% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %} + location = {{location}} {# stage #} + {% if auto_refresh in (true, false) -%} + auto_refresh = {{auto_refresh}} + {%- endif %} + {% if aws_sns_topic -%} + aws_sns_topic = '{{aws_sns_topic}}' + {%- endif %} + {% if table_format | lower == "delta" %} + refresh_on_create = false + {% endif %} + {% if pattern -%} pattern = '{{pattern}}' {%- endif %} + {% if integration -%} integration = '{{integration}}' {%- endif %} + file_format = {{file_format}} + {% if table_format -%} table_format = '{{table_format}}' {%- endif %} + +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/external_table/helpers.sql b/dbt/include/snowflake/macros/relations/external_table/helpers.sql new file mode 100644 index 000000000..5a4e1bd52 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/external_table/helpers.sql @@ -0,0 +1,99 @@ +{% macro snowflake__refresh_external_table(relation) %} + + {% set auto_refresh = config.get('auto_refresh', false) %} + {% set manual_refresh = not auto_refresh %} + + {% set partitions = config.get('partitions', none) %} + + {% set table_format = config.get('table_format', none) %} + {% if table_format %} + {% set is_delta = table_format | lower == "delta" %} + {% endif %} + + {# snowpipe as well #} + {% set snowpipe = config.get('snowpipe', none) %} + {% set auto_ingest = snowpipe.get('auto_ingest', false) if snowpipe is mapping %} + + {% set relation_type = 'pipe' if snowpipe is not none else 'external table' %} + + {% if manual_refresh or auto_ingest %} + + {% set ddl %} + begin; + alter {{ relation_type }} {{ relation }} refresh; + commit; + {% endset %} + + {% do return([ddl]) %} + + {% else %} + + {% do return([]) %} + + {% endif %} + +{% endmacro %} + +{% macro is_csv(file_format) %} + +{# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html: + +Important: The external table does not inherit the file format, if any, in the +stage definition. You must explicitly specify any file format options for the +external table using the FILE_FORMAT parameter. + +Note: FORMAT_NAME and TYPE are mutually exclusive; to avoid unintended behavior, +you should only specify one or the other when creating an external table. + +#} + + {% set ff_ltrimmed = file_format|lower|replace(' ','') %} + + {% if 'type=' in ff_ltrimmed %} + + {% if 'type=csv' in ff_ltrimmed %} + + {{return(true)}} + + {% else %} + + {{return(false)}} + + {% endif %} + + {% else %} + + {% set ff_standardized = ff_ltrimmed + | replace('(','') | replace(')','') + | replace('format_name=','') %} + {% set fqn = ff_standardized.split('.') %} + + {% if fqn | length == 3 %} + {% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %} + {% elif fqn | length == 2 %} + {% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %} + {% else %} + {% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %} + {% endif %} + + {% call statement('get_file_format', fetch_result = True) %} + show file formats in {{ff_database}}.{{ff_schema}} + {% endcall %} + + {% set ffs = load_result('get_file_format').table %} + + {% for ff in ffs %} + + {% if ff['name']|lower == ff_identifier and ff['type']|lower == 'csv' %} + + {{return(true)}} + + {% endif %} + + {% endfor %} + + {{return(false)}} + + {% endif %} + +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql b/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql new file mode 100644 index 000000000..0a044184b --- /dev/null +++ b/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql @@ -0,0 +1,96 @@ +{% macro snowflake_create_empty_table(relation, columns) %} + + create or replace table {{ relation }} ( + {% if columns|length == 0 %} + value variant, + {% else -%} + {%- for column in columns -%} + {{column.name}} {{column.data_type}}, + {% endfor -%} + {% endif %} + metadata_filename varchar, + metadata_file_row_number bigint, + metadata_file_last_modified timestamp, + _dbt_copied_at timestamp + ); + +{% endmacro %} + +{% macro snowflake_get_copy_sql(relation, columns, explicit_transaction=false) %} +{# This assumes you have already created an external stage #} + + {% set location = config.get('location') %} + {% set file_format = config.get('file_format') %} + + {% set pattern = config.get('pattern') %} + {%- set is_csv = is_csv(file_format) %} + + {% set snowpipe = config.get('snowpipe', none) %} + {%- set copy_options = snowpipe.get('copy_options', none) -%} + + {%- if explicit_transaction -%} begin; {%- endif %} + + copy into {{ relation }} + from ( + select + {% if columns|length == 0 %} + $1::variant as value, + {% else -%} + {%- for column in columns -%} + {%- set col_expression -%} + {%- if is_csv -%}nullif(${{loop.index}},''){# special case: get columns by ordinal position #} + {%- else -%}nullif($1:{{column.name}},''){# standard behavior: get columns by name #} + {%- endif -%} + {%- endset -%} + {{col_expression}}::{{column.data_type}} as {{column.name}}, + {% endfor -%} + {% endif %} + metadata$filename::varchar as metadata_filename, + metadata$file_row_number::bigint as metadata_file_row_number, + metadata$file_last_modified::timestamp as metadata_file_last_modified, + metadata$start_scan_time::timestamp as _dbt_copied_at + from {{location}} {# stage #} + ) + file_format = {{file_format}} + {% if pattern -%} pattern = '{{pattern}}' {%- endif %} + {% if copy_options %} {{copy_options}} {% endif %}; + + {% if explicit_transaction -%} commit; {%- endif -%} + +{% endmacro %} + + +{% macro snowflake_create_snowpipe(relation, columns) %} + + {% set snowpipe = config.get('snowpipe', none) %} + +{# https://docs.snowflake.com/en/sql-reference/sql/create-pipe.html #} + create or replace pipe {{ relation }} + {% if snowpipe.auto_ingest -%} auto_ingest = {{snowpipe.auto_ingest}} {%- endif %} + {% if snowpipe.aws_sns_topic -%} aws_sns_topic = '{{snowpipe.aws_sns_topic}}' {%- endif %} + {% if snowpipe.integration -%} integration = '{{snowpipe.integration}}' {%- endif %} + {% if snowpipe.error_integration -%} error_integration = '{{snowpipe.error_integration}}' {%- endif %} + as {{ snowflake_get_copy_sql(relation, columns) }} + +{% endmacro %} + +{% macro snowflake_refresh_snowpipe(relation) %} + + {% set snowpipe = config.get('snowpipe', none) %} + {% set auto_ingest = snowpipe.get('auto_ingest', false) if snowpipe is mapping %} + + {% if auto_ingest is true %} + + {% do return([]) %} + + {% else %} + + {% set ddl %} + alter pipe {{ relation }} refresh + {% endset %} + + {{ return([ddl]) }} + + {% endif %} + +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql b/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql new file mode 100644 index 000000000..fa8726259 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql @@ -0,0 +1,9 @@ +{% macro snowflake_get_build_snowpipe_sql(relation, columns) %} + + {{ snowflake_create_empty_table(relation, columns) }} + + {{ snowflake_get_copy_sql(relation, columns, explicit_transaction) }} + + {{ snowflake_create_snowpipe(relation, columns) }} + +{% endmacro %}