Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spike external table materialization for sources #1043

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/relation_configs/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class SnowflakeRelationType(StrEnum):
CTE = "cte"
External = "external"
DynamicTable = "dynamic_table"
ExternalTable = "external_table"


class SnowflakeIncludePolicy(Policy):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{% macro snowflake__create_external_table(source_node) %}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}
{%- set partitions = external.partitions -%}

{# {{ log('XXX: columns: ' ~ columns, info=True) }}
{{ log('XXX: partitions: ' ~ columns, info=True) }}
{% set partition_map = partitions|map(attribute='name')|join(', ') %}
{{ log('XXX: partition_map: ' ~ partition_map, info=True) }} #}




{%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%}

{%- set relation = api.Relation.create(
database=source_node.database, schema=source_node.schema, identifier=source_node.name,
type='external_table') -%}

{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #}
{# This assumes you have already created an external stage #}

create or replace external table {{ relation }}
(

{%- for column in columns %}
{{ log('column: ' ~ column.name, info=True) }}
{%- set column_alias = column.name %}
{%- set col_expression -%}
{%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%}
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end)
{%- endset %}
{{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}})
{{- ',' if not loop.last -}}
{% endfor %}

)
location = {{external.location}} {# stage #}

file_format = {{external.file_format}}

{% endmacro %}
125 changes: 125 additions & 0 deletions dbt/include/snowflake/macros/relations/external_table/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
{% macro snowflake__create_external_schema(source_node) %}

{% set schema_exists_query %}
show terse schemas like '{{ source_node.schema }}' in database {{ source_node.database }} limit 1;
{% endset %}
{% if execute %}
{% set schema_exists = run_query(schema_exists_query)|length > 0 %}
{% else %}
{% set schema_exists = false %}
{% endif %}

{% if schema_exists %}
{% set ddl %}
select 'Schema {{ source_node.schema }} exists' from dual;
{% endset %}
{% else %}
{% set fqn %}
{% if source_node.database %}
{{ source_node.database }}.{{ source_node.schema }}
{% else %}
{{ source_node.schema }}
{% endif %}
{% endset %}

{% set ddl %}
create schema if not exists {{ fqn }};
{% endset %}
{% endif %}

{% do return(ddl) %}

{% endmacro %}

{% macro snowflake__refresh_external_table(source_node) %}

{% set external = source_node.external %}
{% set snowpipe = source_node.external.get('snowpipe', none) %}

{% set auto_refresh = external.get('auto_refresh', false) %}
{% set partitions = external.get('partitions', none) %}
{% set delta_format = (external.table_format | lower == "delta") %}

{% set manual_refresh = not auto_refresh %}

{% if manual_refresh %}

{% set ddl %}
begin;
alter external table {{source(source_node.source_name, source_node.name)}} refresh;
commit;
{% endset %}

{% do return([ddl]) %}

{% else %}

{% do return([]) %}

{% endif %}

{% endmacro %}

{% macro is_csv(file_format) %}

{# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html:

Important: The external table does not inherit the file format, if any, in the
stage definition. You must explicitly specify any file format options for the
external table using the FILE_FORMAT parameter.

Note: FORMAT_NAME and TYPE are mutually exclusive; to avoid unintended behavior,
you should only specify one or the other when creating an external table.

#}

{% set ff_ltrimmed = file_format|lower|replace(' ','') %}

{% if 'type=' in ff_ltrimmed %}

{% if 'type=csv' in ff_ltrimmed %}

{{return(true)}}

{% else %}

{{return(false)}}

{% endif %}

{% else %}

{% set ff_standardized = ff_ltrimmed
| replace('(','') | replace(')','')
| replace('format_name=','') %}
{% set fqn = ff_standardized.split('.') %}

{% if fqn | length == 3 %}
{% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %}
{% elif fqn | length == 2 %}
{% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %}
{% else %}
{% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %}
{% endif %}

{% call statement('get_file_format', fetch_result = True) %}
show file formats in {{ff_database}}.{{ff_schema}}
{% endcall %}

{% set ffs = load_result('get_file_format').table %}

{% for ff in ffs %}

{% if ff['name']|lower == ff_identifier and ff['type']|lower == 'csv' %}

{{return(true)}}

{% endif %}

{% endfor %}

{{return(false)}}

{% endif %}

{% endmacro %}
Loading