Skip to content

Commit

Permalink
add external table
Browse files Browse the repository at this point in the history
  • Loading branch information
dataders committed May 15, 2024
1 parent 3ac7f75 commit d2782dc
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 0 deletions.
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/relation_configs/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class SnowflakeRelationType(StrEnum):
CTE = "cte"
External = "external"
DynamicTable = "dynamic_table"
ExternalTable = "external_table"


class SnowflakeIncludePolicy(Policy):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{% macro snowflake__create_external_table(source_node) %}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}
{%- set partitions = external.partitions -%}
{%- set infer_schema = external.infer_schema -%}

{% if infer_schema %}
{% set query_infer_schema %}
select * from table( infer_schema( location=>'{{external.location}}', file_format=>'{{external.file_format}}') )
{% endset %}
{% if execute %}
{% set columns_infer = run_query(query_infer_schema) %}
{% endif %}
{% endif %}

{%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%}

{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #}
{# This assumes you have already created an external stage #}
create or replace external table {{source(source_node.source_name, source_node.name)}}
{%- if columns or partitions or infer_schema -%}
(
{%- if partitions -%}{%- for partition in partitions %}
{{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}}
{%- endfor -%}{%- endif -%}
{%- if not infer_schema -%}
{%- for column in columns %}
{%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %}
{%- set column_alias -%}
{%- if 'alias' in column and column.quote -%}
{{adapter.quote(column.alias)}}
{%- elif 'alias' in column -%}
{{column.alias}}
{%- else -%}
{{column_quoted}}
{%- endif -%}
{%- endset %}
{%- set col_expression -%}
{%- if column.expression -%}
{{column.expression}}
{%- else -%}
{%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%}
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end)
{%- endif -%}
{%- endset %}
{{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}})
{{- ',' if not loop.last -}}
{% endfor %}
{% else %}
{%- for column in columns_infer %}
{%- set col_expression -%}
{%- set col_id = 'value:' ~ column[0] -%}
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end)
{%- endset %}
{{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}})
{{- ',' if not loop.last -}}
{% endfor %}
{%- endif -%}
)
{%- endif -%}
{% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %}
location = {{external.location}} {# stage #}
{% if external.auto_refresh in (true, false) -%}
auto_refresh = {{external.auto_refresh}}
{%- endif %}
{% if external.aws_sns_topic -%}
aws_sns_topic = '{{external.aws_sns_topic}}'
{%- endif %}
{% if external.table_format | lower == "delta" %}
refresh_on_create = false
{% endif %}
{% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %}
{% if external.integration -%} integration = '{{external.integration}}' {%- endif %}
file_format = {{external.file_format}}
{% if external.table_format -%} table_format = '{{external.table_format}}' {%- endif %}
{% endmacro %}
125 changes: 125 additions & 0 deletions dbt/include/snowflake/macros/relations/external_table/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
{% macro snowflake__create_external_schema(source_node) %}

{% set schema_exists_query %}
show terse schemas like '{{ source_node.schema }}' in database {{ source_node.database }} limit 1;
{% endset %}
{% if execute %}
{% set schema_exists = run_query(schema_exists_query)|length > 0 %}
{% else %}
{% set schema_exists = false %}
{% endif %}

{% if schema_exists %}
{% set ddl %}
select 'Schema {{ source_node.schema }} exists' from dual;
{% endset %}
{% else %}
{% set fqn %}
{% if source_node.database %}
{{ source_node.database }}.{{ source_node.schema }}
{% else %}
{{ source_node.schema }}
{% endif %}
{% endset %}

{% set ddl %}
create schema if not exists {{ fqn }};
{% endset %}
{% endif %}

{% do return(ddl) %}

{% endmacro %}

{% macro snowflake__refresh_external_table(source_node) %}

{% set external = source_node.external %}
{% set snowpipe = source_node.external.get('snowpipe', none) %}

{% set auto_refresh = external.get('auto_refresh', false) %}
{% set partitions = external.get('partitions', none) %}
{% set delta_format = (external.table_format | lower == "delta") %}

{% set manual_refresh = not auto_refresh %}

{% if manual_refresh %}

{% set ddl %}
begin;
alter external table {{source(source_node.source_name, source_node.name)}} refresh;
commit;
{% endset %}

{% do return([ddl]) %}

{% else %}

{% do return([]) %}

{% endif %}

{% endmacro %}

{% macro is_csv(file_format) %}

{# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html:

Important: The external table does not inherit the file format, if any, in the
stage definition. You must explicitly specify any file format options for the
external table using the FILE_FORMAT parameter.

Note: FORMAT_NAME and TYPE are mutually exclusive; to avoid unintended behavior,
you should only specify one or the other when creating an external table.

#}

{% set ff_ltrimmed = file_format|lower|replace(' ','') %}

{% if 'type=' in ff_ltrimmed %}

{% if 'type=csv' in ff_ltrimmed %}

{{return(true)}}

{% else %}

{{return(false)}}

{% endif %}

{% else %}

{% set ff_standardized = ff_ltrimmed
| replace('(','') | replace(')','')
| replace('format_name=','') %}
{% set fqn = ff_standardized.split('.') %}

{% if fqn | length == 3 %}
{% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %}
{% elif fqn | length == 2 %}
{% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %}
{% else %}
{% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %}
{% endif %}

{% call statement('get_file_format', fetch_result = True) %}
show file formats in {{ff_database}}.{{ff_schema}}
{% endcall %}

{% set ffs = load_result('get_file_format').table %}

{% for ff in ffs %}

{% if ff['name']|lower == ff_identifier and ff['type']|lower == 'csv' %}

{{return(true)}}

{% endif %}

{% endfor %}

{{return(false)}}

{% endif %}

{% endmacro %}

0 comments on commit d2782dc

Please sign in to comment.