Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dbt 1.8 support #135

Closed
wants to merge 14 commits into from
3 changes: 3 additions & 0 deletions .changes/unreleased/Added-20240822-111700.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Added
body: dbt 1.7 support with tests and refactoring
time: 2024-08-22T11:17:00.770973+01:00
28 changes: 15 additions & 13 deletions dbt/adapters/firebolt/connections.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from multiprocessing.context import SpawnContext
from typing import Generator, Optional, Tuple, Union

import dbt.exceptions
from dbt.adapters.base import Credentials
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import (
from dbt.adapters.contracts.connection import (
AdapterRequiredConfig,
AdapterResponse,
Connection,
Credentials,
QueryComment,
)
from dbt.events import AdapterLogger # type: ignore
from dbt.exceptions import DbtRuntimeError
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.sql.connections import SQLConnectionManager
from dbt_common.exceptions import (
DbtConfigError,
DbtRuntimeError,
NotImplementedError,
)
from firebolt.client import DEFAULT_API_URL
from firebolt.client.auth import Auth, ClientCredentials, UsernamePassword
from firebolt.db import ARRAY, DECIMAL
Expand Down Expand Up @@ -45,13 +49,13 @@ def __post_init__(self) -> None:
# are provided instead
if not self.user and not self.password:
if not self.client_id or not self.client_secret:
raise dbt.exceptions.DbtProfileError(
raise DbtConfigError(
'Either user and password or client_id and client_secret'
' must be provided'
)
else:
if self.client_id or self.client_secret:
raise dbt.exceptions.DbtProfileError(
raise DbtConfigError(
'Either user and password or client_id and client_secret'
' must be provided'
)
Expand Down Expand Up @@ -99,13 +103,13 @@ class FireboltConnectionManager(SQLConnectionManager):

TYPE = 'firebolt'

def __init__(self, profile: AdapterRequiredConfig):
def __init__(self, profile: AdapterRequiredConfig, mp_context: SpawnContext):
# Query comment in appent mode only
# This allows clearer view of queries in query_history
if not hasattr(profile, 'query_comment'):
setattr(profile, 'query_comment', QueryComment())
profile.query_comment.append = True
super().__init__(profile)
super().__init__(profile, mp_context)

def __str__(self) -> str:
return 'FireboltConnectionManager()'
Expand Down Expand Up @@ -181,9 +185,7 @@ def commit(self) -> None:

def cancel(self, connection: Connection) -> None:
"""Cancel the last query on the given connection."""
raise dbt.exceptions.NotImplementedError(
'`cancel` is not implemented for this adapter!'
)
raise NotImplementedError('`cancel` is not implemented for this adapter!')

@classmethod
def data_type_code_to_name( # type: ignore[override] # FIR-29423
Expand Down
30 changes: 24 additions & 6 deletions dbt/adapters/firebolt/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
from typing import Any, List, Mapping, Optional, Union

import agate
from dbt.adapters.base import available
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
from dbt.adapters.base.impl import ConstraintSupport
from dbt.adapters.base.meta import available
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.sql import SQLAdapter
from dbt.contracts.graph.nodes import ConstraintType
from dbt.dataclass_schema import ValidationError, dbtClassMixin
from dbt.exceptions import (
from dbt.adapters.capability import (
Capability,
CapabilityDict,
CapabilitySupport,
Support,
)
from dbt.adapters.protocol import AdapterConfig
from dbt.adapters.sql.impl import SQLAdapter
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.dataclass_schema import ValidationError, dbtClassMixin
from dbt_common.exceptions import (
CompilationError,
DbtRuntimeError,
NotImplementedError,
Expand Down Expand Up @@ -114,6 +121,17 @@ class FireboltAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED,
}

_capabilities: CapabilityDict = CapabilityDict(
{
Capability.SchemaMetadataByRelations: CapabilitySupport(
support=Support.Full
),
Capability.TableLastModifiedMetadata: CapabilitySupport(
support=Support.Unsupported
),
}
)

@classmethod
def is_cancelable(cls) -> bool:
return False
Expand Down
24 changes: 21 additions & 3 deletions dbt/adapters/firebolt/relation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from dataclasses import dataclass, field
from typing import Optional
from typing import Dict, FrozenSet, Optional

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.exceptions import DbtRuntimeError
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.relation import Policy, RelationType
from dbt.adapters.relation_configs.config_base import RelationConfigBase
from dbt_common.exceptions import DbtRuntimeError


@dataclass
Expand Down Expand Up @@ -30,6 +32,22 @@ class FireboltRelation(BaseRelation):
quote_character: str = '"'
is_delta: Optional[bool] = None
information: Optional[str] = None
relation_configs: Dict[RelationType, RelationConfigBase] = field(
default_factory=lambda: {}
)
# list relations that can be renamed (e.g. `RENAME my_relation TO my_new_name;`)
renameable_relations: FrozenSet[RelationType] = field(
default_factory=lambda: frozenset({})
)
# list relations that can be atomically replaced
# (e.g. `CREATE OR REPLACE my_relation..` versus `DROP` and `CREATE`)
replaceable_relations: FrozenSet[RelationType] = field(
default_factory=lambda: frozenset(
{
RelationType.View,
}
)
)

def render(self) -> str:
if self.include_policy.database and self.include_policy.schema:
Expand Down
Empty file.
84 changes: 0 additions & 84 deletions dbt/include/firebolt/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -183,90 +183,6 @@
{{ return(info_table) }}
{% endmacro %}


{% macro firebolt__create_table_as(temporary,
relation,
select_sql,
language='sql') -%}
{# Create table using CTAS
Args:
temporary (bool): Unused, included so macro signature matches
that of dbt's default macro
relation (dbt relation/dict)
select_sql (string): The SQL query that will be used to generate
the internal query of the CTAS
language (string): sql or python models. Firebolt only supports sql.
#}
{%- if language == 'python' -%}
{{ exceptions.raise_compiler_error("Firebolt does not currently support "
"Python models.") }}
{%- elif language not in ['python', 'sql'] -%}
{{ exceptions.raise_compiler_error("Unexpected language parameter supplied: %s "
"Must be either 'sql' or 'python'." % (language)) }}
{%- endif -%}

{%- set table_type = config.get('table_type', default='dimension') | upper -%}
{%- set primary_index = config.get('primary_index') -%}
{%- set incremental_strategy = config.get('incremental_strategy') -%}
{%- set partitions = config.get('partition_by') %}

CREATE {{ table_type }} TABLE IF NOT EXISTS {{ relation }}
{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
{{ get_assert_columns_equivalent(select_sql) }}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation }} (
{{ adapter.dispatch('get_column_names', 'dbt')() }}
)
{%- set select_sql = get_select_subquery(select_sql) %}
{% endif %}
{%- if primary_index %}
PRIMARY INDEX
{% if primary_index is iterable and primary_index is not string %}
{{ primary_index | join(', ') }}
{%- else -%}
{{ primary_index }}
{%- endif -%}
{%- endif -%}
{% if partitions %}
PARTITION BY
{% if partitions is iterable and partitions is not string %}
{{ partitions | join(', ') }}
{%- else -%}
{{ partitions }}
{%- endif -%}
{%- endif %}
{%- if not contract_config.enforced %}
AS (
{% endif -%}
{{ select_sql }}
{% if not contract_config.enforced -%}
)
{%- endif -%}
{% endmacro %}


{% macro firebolt__create_view_as(relation, select_sql) %}
{#-
Return SQL string to create view.
Args:
relation (dict): dbt relation
select_sql (string): The SQL query that will be used to generate
the internal query of the CTAS
#}

CREATE OR REPLACE VIEW {{ relation.identifier }}

{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
{{ get_assert_columns_equivalent(select_sql) }}
{%- endif %}
AS (
{{ select_sql }}
)
{% endmacro %}


{% macro firebolt__truncate_relation(relation) -%}
{#
Truncate relation. Actual macro is drop_relation in ./adapters/relation.sql.
Expand Down
98 changes: 89 additions & 9 deletions dbt/include/firebolt/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,23 +1,103 @@
{# This is for building docs. Right now it's an incomplete description of
the columns (for instance, `is_nullable` is missing) but more could be added later. #}

{% macro firebolt__get_catalog(information_schemas, schemas) -%}
{%- call statement('catalog', fetch_result=True) %}
{% macro firebolt__get_catalog(information_schema, schemas) -%}

{% set query %}
with tables as (
{{ firebolt__get_catalog_tables_sql(information_schema) }}
{{ firebolt__get_catalog_schemas_where_clause_sql(schemas) }}
),
columns as (
{{ firebolt__get_catalog_columns_sql(information_schema) }}
{{ firebolt__get_catalog_schemas_where_clause_sql(schemas) }}
)
{{ firebolt__get_catalog_results_sql() }}
{%- endset -%}

{{ return(run_query(query)) }}

{%- endmacro %}

{% macro firebolt__get_catalog_relations(information_schema, relations) -%}

{% set query %}
with tables as (
{{ firebolt__get_catalog_tables_sql(information_schema) }}
{{ firebolt__get_catalog_relations_where_clause_sql(relations) }}
),
columns as (
{{ firebolt__get_catalog_columns_sql(information_schema) }}
{{ firebolt__get_catalog_relations_where_clause_sql(relations) }}
)
{{ firebolt__get_catalog_results_sql() }}
{%- endset -%}

{{ return(run_query(query)) }}

{%- endmacro %}


{% macro firebolt__get_catalog_tables_sql(information_schema) -%}
SELECT
tbls.table_catalog AS table_database,
tbls.table_schema as table_schema,
table_type,
tbls.table_name as table_name,
cols.column_name as column_name,
cols.data_type AS column_type,
CASE
WHEN table_type = 'VIEW' THEN 'VIEW'
ELSE 'TABLE'
END AS relation_type,
cols.ordinal_position as column_index
END AS relation_type
FROM
information_schema.tables tbls
JOIN information_schema.columns cols USING (table_name)
{% endcall -%}
{{ return(load_result('catalog').table) }}
{%- endmacro %}


{% macro firebolt__get_catalog_columns_sql(information_schema) -%}
select
table_catalog as "table_database",
table_schema as "table_schema",
table_name as "table_name",
column_name as "column_name",
ordinal_position as "column_index",
data_type as "column_type"
from information_schema.columns
{%- endmacro %}

{% macro firebolt__get_catalog_results_sql() -%}
SELECT *
FROM tables
JOIN columns USING ("table_database", "table_schema", "table_name")
ORDER BY "column_index"
{%- endmacro %}


{% macro firebolt__get_catalog_schemas_where_clause_sql(schemas) -%}
WHERE ({%- for schema in schemas -%}
UPPER("table_schema") = UPPER('{{ schema }}'){%- if not loop.last %} OR {% endif -%}
{%- endfor -%})
{%- endmacro %}


{% macro firebolt__get_catalog_relations_where_clause_sql(relations) -%}
WHERE (
{%- for relation in relations -%}
{% if relation.schema and relation.identifier %}
(
UPPER("table_schema") = UPPER('{{ relation.schema }}')
AND UPPER("table_name") = UPPER('{{ relation.identifier }}')
)
{% elif relation.schema %}
(
UPPER("table_schema") = UPPER('{{ relation.schema }}')
)
{% else %}
{% do exceptions.raise_compiler_error(
'`get_catalog_relations` requires a list of relations, each with a schema'
) %}
{% endif %}

{%- if not loop.last %} OR {% endif -%}
{%- endfor -%}
)
{%- endmacro %}
Loading