Skip to content

Commit

Permalink
feat: Insert overwrite incremental model (#67)
Browse files Browse the repository at this point in the history
* Pulled out JDBC connection code and replaced it with our DB-API connection. Minor change to field values in profiles.yml to match internal nomenclature: host -> api_endpoint.

* Hand merged main.

* dbt debug shows we're making connections. Seeding is failing with type errors.

* Can connect to the database and dbt run, but dbt seed isn't currently working. I think that's actually an issue in main.

* add dbt debugging

* Little bit of cleanup. Mostly removed a couple of references to and uses of jaydebeapi, Java, and the JDBC.

* deprecate JDBC java type conversion hacks

* Update setup.py

* Updated a couple of variable names and the setup.py file.

* Updated account and engine to account_name and engine_name in connections.py.

* Updated method of getting credentials in connection.py.

* Updated .dbtspec test file to use only env variables. connections.py was still using hosts instead of api_endpoint.

* Bumped version number and dbt dependency.

* Minor edits to catalog.sql for cleanup.

* Changed env variable names in tests/integration/firebolt.dbtspec for eash of testing.

* Bumped version number.

* Bumped version number.

* Fixed incorrect version number.

* Bumped version number.

* Removed jaydebeapi from the installation requirements.

* Removed unnecessary init file and renamed tests directory.

* Added firebolt-sdk >= 0.3.2 as a required install.

* Removed get_status() from connections.py.

* Updated .dbtspec pytest file to get schema name from environment.

* Found an removed an extraneous mention of JDBC in an error msg.

* Removed EngineOfflineException from connections.py.

* DROP CASCADE now works and dropped table is recreated. Bonus side effect: dbt debug runs without errors or skipped steps.

* Changed SELECT schema_exists to SELECT 1. Since Firebolt doesn't currently support schemas, it's always true. Pytest basecase now passes.

* Bumped the version number.

* Updated changelog.

* Added dummy firebolt__snapshot_string_as_time macro for testing.

* Added firebolt__snapshot_string_as_time macro to adapters.sql. Moved some tests around in firebolt.dbtspec.

* More moving values around in firebolt.dbtspec file.

* Added incremental models from dbt/core/dbt/include/global_project/macros/materializations/models/incremental.

* Added line break in adapters.sql, created is_incremental.sql.

* Minor changes for output while troubleshooting failing pytest.

* Rewrote query in catalog.sql. Log statements added and removed, firebolt.dbtspec edited.

* Updating merge to add two macros: get_merge_sql and get_delete_insert_merge_sql.

* Edited firebolt.dbtspec file to better separate tests.

* Removed column_comment from catalog.sql.

* Removed some extraneous log statements.

* Little code cleanup in adapters.sql.

* Style tweeks

* Updated build plan on external tables to they default to drop table.

* Added exceptions for missing or misspelled fields in external tables.

* As name is a required field for columns in external tables, removed some logic to error out with missing column names.

* Updated changelog and version number.

* Small edit to error messages for missing fields on external tables.

* Changed default behavior on external tables: Firebolt external tables only drop and rebuild if explicity told to using a variable on dbt run-operation stage_external_sources.

* Added allowable prefixes to PR title description.

* Updated firebolt.dbtspec file.

* Formatted integration_tests.yml to match that of code-check.yml. Added integration tests to pull_request.yml.

* Now, actually added integration-tests to pull_request.yml. Fixed directory path issue in integration-tests.yml.

* Some reformatting for legibility. Added docstrings.

* Reordered index table names, tiny edits in error msgs, renamed type field to index_type for clarity.

* Now passing secrets from pull_requests.yml to integration_tests.yml.

* Commenting out integration tests from PR workflow.

* Fixed misspelled variable FIREBOLT_USERNAME.

* Edited pull_request.yml to use this branch for testing. Will rever later.

* Bumped version number to 1.0.2.

* Reverted pull_request.yml so that future integration tests will frun from main branch.

* Bumped version number and added a little text to the changelog.

* Cleaned up some missing index_type variable names. Bumped required python-sdk version to 0.5.2.

* Cleanup on switch to index_type. Edited types for key and join columns, and added upper() to ensure match in errors in FireboltIndexConfig.

* Moved todos out of comments and into dedicated Confluence doc. And yes, I know that's not actually relevant to the job at hand.

* Added logic to deal with having a list of keys in aggregate indexes, and for correctly naming the indexes if there is a list of keys.

* Edited many call statements to make dbt log output easier to read.

* Style cleanup.

* Cleanup of incremental.sql

* Updated changelog and bumped version number.

* Added line to pull_request_template.md.

* Removed extraneous log and print calls, as well as unused function get_columns().

* Fixed regression in github workflow.

* Regressed some code in adapters.sql.

* Regressed changes in catalog.sql that will instead living in passing-base-integration-test branch.

* Changelog version number hadn't been bumped.

* Optimized language in changelog.

* Removed extraneous changes not related to logic and moved them to a different branch.

* Removed extraneous/spurious merge.sql.

* Added merge.sql back in, although only get_delete_insert_merge_sql macro.

* removed snapshot_string_as_time macro from adapters.sql.

* Merge.sql had mis-formatted sql.

* Updated readme to include append-only in supported features section.

* Removed any chance of DELETE FROM being accidentally called during incremental updates.

* Removing extra calls to drop_relation_if_exists in incremental.

* Added relational to adapters.

* Trying to replace drop_if.

* Finally switched out create_table_as to create_view_as.

* Added code to delete all temp and backup relations at the end of an incremental run. Renamed some relation names to make code clearer.

* Changed create views back to create tables in incremental.

* Fixed an error with an undeclared variable, table, in relation.sql.

* Clean up, add some comments, rename variable for more clarity.

* Added a strategies sql module that contains all the code for various incremental strategies and a conditional branch to choose one.

* Fixed typo in call to get_insert_sql.

* Fixed error message.

* Moved drop_relation from adapters.sql to relation.sql.

* Removed mypy flags from setup.cfg.

* Undid last commit.

* Rewrote a lot of incremental.sql to remove backup relations and use a view for the intermediate table, which holds the new records. Minor edits otherwise, to variable names and comments.

* Significant changes in incremental.sql.

* Updated the changelog to reflect a breaking change from the last merge on the columns of aggregating and join indexes.

* Minor cleanup of comments in incremental.sql.

* Final(?) changes to incremental append-only. Skipped creation of intermediate table, allowing for removal of extra CTAS by skipping table rename at end. A little bit of code cleanup. Now properly registering all created and dropped tables with dbt's internal database. New records are added to a novel view, and that view is dropped after the final insertion.

* Fixed a problem with dropping a relation that didn't exist. Incremental pytest is now passing.

* Updated firebolt.dbtspec to include incremental.

* Found an extra closing curly bracket in incremental.sql.

* Added two jinja files and updated incremental.sql to allow for correct processing of errors on schema changes.

* Fixed some syntax errors.

* Updated all logic to check for errors on schema changes and to log warnings.

* Disallow any schema changes whatsoever.

* Started to add macros and pseudocode for doing insert-overwrite.

* Fixed a bug/typo in incremental/strategies.sql.

* Cleaned up and added some error messages. Fixed logic for retrieval of partitions to drop.

* Changed get_response() to return Success from False.

* Clarified some comments in incremental.sql and also renamed some variables for ease of comprehension.

* Clean up error message output.

* Removed all log and print calls and updated changelog.

* Updated readme to show that we now support insert_overwrite.

* Moved  out of passing tests in firebolt.dbtspec.

* Updated firebolt.dbtspec to make comment clearer.

* Updated the changelog and version number.

* Bumped version number.

* Fixed queries on dropping partitions. The partitions were actually just a string, not an iterable, so trimmed first and last chars ('[' and ']'). Rest of string was correctly formatted.

* Rewrote drop partition code to correctly format SQL query string.

* Removed log() statements.

* Trying to rectify logic of DROP PARTITION SQL so that it works equally well whether partitions specified in config files or from SQL query.

* Updated drop_partitions_sql macro so that it works with either queried data or data sent in from a config file.

* Changed log message.

* Finished logic on getting  to work, as well as .

* Fixed issue with valies being truncated before partitions are dropped. Added a billion hyphens to jinja tags to make log output easier to read.

* Made a bunch of changes to get correct logic for parsing results from queries used in order to figure out which partitions to drop on an insert_overwrite incremental model. Also, some code cleanup and commenting.

* Having an issue with multiple columns in partitions. jinja is trying to do something like DROP PARTITIION '['henry',, 'megan']'.

* Fixed bug with multiple columns in strategies.sql. Started trying to add in code to determine column types. Switching to new-integration-tests to complete that.

* Effed up and made changes that ought to be in firebolt__get_columns_in_relation in firebolt__list_relations_without_caching instead. Now removed them completely.

* Added functions to convert list of SDK Column types to FireboltColumn types. Next step is to change output type of get_columns_in_relation from List[Agate.table] to List[FireboltColumn].

* Added a bunch of type 'annotations' to docstrings for jinja macros.

* Successfully diffing FireboltColumns with column and dtype. Wrote a python fn to check for date types and append ::DATE.

* Found a couple of tiny issues, mostly in docstrings.

* Removed an extraneous line of code found by Sonar Cloud.

* Slightly optimized diff_column_data_types in column_helpers.sql.

* Walking back changes to return type of get_columns_in_relation. Is currently List[FireboltColumn], moving back to agate.Table.

* Finished with type changes. Also cleaned up some docstrings and comments.

* Was missing an opening curly brace on the macro get_columns_in_relation.

* Updated connection auth and required firebolt-sdk version.

* Updated impl.sdk_column_list_to_firebolt_column_list to convert types from Python to Firebolt internal types.

* Updated the changelog.

* Created new function create_type_string() in impl.py to correctly parse Firebolt array types. Updated changelog to acknowledge breaking change in auth required with firebolt-sdk >= .8.

* Changed floats to map to Firebolt DOUBLE types.

* Switched to using isinstance() rather than string conversion and find() to parse arrays.

* Changed table build success output to all caps to match dbt's status text when running.

* Was accidentally checking for extracts as columns when figuring out date types. Now not checking for anything that isn't an _actual_ column.

* Updated firebolt-sdk to deal with fossa's httpx complaints.

* Minor changes to deal with comments by reviewer on PR.

* Edited a function argument name.

* Changed warning on missing  field to exception.

Co-authored-by: Eric Ford <[email protected]>
Co-authored-by: swanderz <[email protected]>
  • Loading branch information
3 people authored Jun 10, 2022
1 parent 35a3e26 commit af4be25
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 137 deletions.
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## v.1.1.0

### Features

- dbt-firebolt now supports the insert/overwrite incremental model.

### Under the hood

- dbt-firebolt now requires firebolt-sdk>=0.8

### Breaking changes

- dbt-firebolt uses firebolt-sdk's new auth token method and requires firebolt-sdk >= 0.8

## v.1.0.6

### Under the hood
Expand All @@ -23,7 +37,7 @@

### Features

- dbt-firebolt now supports append-only incremental models.
- dbt-firebolt now supports the append-only incremental model.
- We now return better/more accurate responses from the cursor for logging/debugging.

### Under the hood
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The table below shows which dbt and Firebolt features are supported by the adapt
| Ephemeral materializations | :white_check_mark: |
| View materializations | :white_check_mark: |
| Incremental materializations - append | :white_check_mark: |
| Incremental materializations - insert_overwrite | :x: |
| Incremental materializations - insert_overwrite | :white_check_mark: |
| Incremental materializations - merge | :x: |
| Snapshots | :x: |
| Seeds | :white_check_mark: |
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/firebolt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dbt.adapters.firebolt.impl import FireboltAdapter
from dbt.include import firebolt

__version__ = '1.0.6'
__version__ = '1.1.0'

Plugin = AdapterPlugin(
adapter=FireboltAdapter,
Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/firebolt/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ class FireboltColumn(Column):
@classmethod
def string_type(cls, size: int) -> str:
return 'text'

@classmethod
def is_date(self) -> bool:
return self.dtype.lower() == 'date'
12 changes: 5 additions & 7 deletions dbt/adapters/firebolt/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dbt.contracts.graph.manifest import Manifest
from dbt.exceptions import RuntimeException
from firebolt.client import DEFAULT_API_URL
from firebolt.client.auth import UsernamePassword
from firebolt.db import connect


Expand Down Expand Up @@ -78,10 +79,9 @@ def open(cls, connection: SQLConnectionManager) -> SQLConnectionManager:
credentials = connection.credentials
# Create a connection based on provided credentials.
connection.handle = connect(
auth=UsernamePassword(credentials.user, credentials.password),
engine_name=credentials.engine_name,
database=credentials.database,
username=credentials.user,
password=credentials.password,
api_endpoint=credentials.api_endpoint,
account_name=credentials.account_name,
)
Expand All @@ -101,16 +101,14 @@ def get_response(cls, cursor: SQLConnectionManager) -> AdapterResponse:
"""
Return an AdapterResponse object. Note that I can't overload/extend it
as it's defined in dbt core and other internal fns complain if it has extra
fields. code field is missing for Firebolt adapter, as it's not returned
fields. `code` field is missing for Firebolt adapter, as it's not returned
from the SDK/API.
"""
success = 'False'
rowcount = cursor.rowcount
if cursor.rowcount == -1:
success = 'True'
rowcount = 0
return AdapterResponse(
_message=success,
_message='SUCCESS',
rows_affected=rowcount,
code=None,
)
Expand All @@ -123,7 +121,7 @@ def begin(self) -> None:

def commit(self) -> None:
"""
Passing `SQLConnectionManager.begin()` because
Passing `SQLConnectionManager.commit()` because
Firebolt does not yet support transactions.
"""

Expand Down
76 changes: 73 additions & 3 deletions dbt/adapters/firebolt/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.sql import SQLAdapter
from dbt.dataclass_schema import ValidationError, dbtClassMixin
from firebolt.async_db._types import ARRAY
from firebolt.async_db._types import Column as SDKColumn

from dbt.adapters.firebolt.column import FireboltColumn
from dbt.adapters.firebolt.connections import FireboltConnectionManager
Expand Down Expand Up @@ -194,6 +196,45 @@ def stack_tables(self, tables_list: List[agate.Table]) -> agate.Table:
.exclude(['group'])
)

@available.parse_none
def sdk_column_list_to_firebolt_column_list(
self, columns: List[SDKColumn]
) -> List[FireboltColumn]:
"""
Extract and return list of FireboltColumns with names and data types
extracted from SDKColumns.
Args:
columns: list of Column types as defined in the Python SDK
"""
return [
FireboltColumn(
column=col.name, dtype=self.create_type_string(col.type_code)
)
for col in columns
]

@available.parse_none
def create_type_string(self, type_code: Any) -> str:
"""
Return properly formatted type string for SQL DDL.
Args: type_code is technically a type, but mypy complained that `type`
does not have an attribute `subtype`.
"""
types = {
'str': 'TEXT',
'int': 'LONG',
'float': 'DOUBLE',
'date': 'DATE',
'datetime': 'DATE',
'bool': 'BOOLEAN',
'Decimal': 'DECIMAL',
}
type_code_str = '{}'
while isinstance(type_code, ARRAY):
type_code_str = f'ARRAY({type_code_str})'
type_code = type_code.subtype
return type_code_str.format(types[type_code.__name__])

@available.parse_none
def filter_table(
cls, agate_table: agate.Table, col_name: str, re_match_exp: str
Expand Down Expand Up @@ -229,18 +270,47 @@ def get_rows_different_sql(
f'{relation_a}.{name} = {relation_b}.{name}' for name in names
]
where_clause = ' AND '.join(where_expressions)

columns_csv = ', '.join(names)

sql = COLUMNS_EQUAL_SQL.format(
columns=columns_csv,
relation_a=str(relation_a),
relation_b=str(relation_b),
where_clause=where_clause,
)

return sql

@available.parse_none
def annotate_date_columns_for_partitions(
self,
vals: str,
col_names: Union[List[str], str],
col_types: List[FireboltColumn],
) -> str:
"""
Return a list of partition values as a single string. All columns with
date types will be be suffixed with ::DATE.
Args:
vals: a string of values separated by commas
col_names: either a list of strings or a single string, of the
names of the columns
col_types: Each FireboltColumn has fields for the name of the column
and its type.
"""
vals_list = vals.split(',')
# It's possible that col_names will be single column, in which case
# it might come in as a string.
if type(col_names) is str:
col_names = [col_names]
# Now map from column name to column type.
type_dict = {c.name: c.dtype for c in col_types}
for i in range(len(vals_list)):
if col_names[i] in type_dict and type(type_dict[col_names[i]]) in [
'datetime',
'date',
]:
vals_list[i] += '::DATE'
return ','.join(vals_list)


COLUMNS_EQUAL_SQL = """
WITH diff_count AS (
Expand Down
101 changes: 64 additions & 37 deletions dbt/include/firebolt/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,39 @@
{%- endmacro %}


{% macro sql_convert_columns_in_relation_firebolt(rows) -%}
{% set columns = [] %}
{% for row in rows %}
{% do columns.append(api.Column(*row)) %}
{% endfor %}
{{ return(columns) }}
{% endmacro %}


{% macro firebolt__get_columns_in_relation(relation) -%}
{# Return column information for table identified by relation
as Agate table. #}
{% call statement('get_columns_in_relation', fetch_result=True) %}

SELECT column_name,
data_type,
character_maximum_length,
numeric_precision_radix
FROM information_schema.columns
WHERE table_name = '{{ relation.identifier }}'
ORDER BY column_name
{% endcall %}
{% set table = load_result('get_columns_in_relation').table %}
{{ return(sql_convert_columns_in_relation(table)) }}
{#-
Return column information for table identified by relation as
List[FireboltColumn].
Args: relation: dbt Relation
-#}
{% set sql %}

SELECT * FROM {{ relation }} LIMIT 1
{% endset %}
{#- add_query returns a cursor object. The names and types of the table named
by `relation` have to be converted to the correct type. -#}
{%- set (conn, cursor) = adapter.add_query(sql = sql, abridge_sql_log=True) -%}
{%- set ret_val = adapter.sdk_column_list_to_firebolt_column_list(
cursor.description
) -%}
{{ return(ret_val) }}
{% endmacro %}


{% macro firebolt__list_relations_without_caching(schema_relation) %}
{% macro firebolt__list_relations_without_caching(relation) %}
{# Return all views and tables as agate table.
Args:
schema_relation (dict): Contains values for database and schema.
relation (dict): Contains values for database and schema.

dbt has a relations cache. Using this macro will list all
the relations in the current schema using a direct DB query,
Expand All @@ -167,15 +178,15 @@
#}
{% call statement('list_tables_without_caching', fetch_result=True) %}

SELECT '{{ schema_relation.database }}' AS "database",
SELECT '{{ relation.database }}' AS "database",
table_name AS "name",
'{{ schema_relation.schema }}' AS "schema",
'{{ relation.schema }}' AS "schema",
'table' AS type
FROM information_schema.tables
UNION
SELECT '{{ schema_relation.database }}' AS "database",
SELECT '{{ relation.database }}' AS "database",
table_name AS "name",
'{{ schema_relation.schema }}' AS "schema",
'{{ relation.schema }}' AS "schema",
'view' AS type
FROM information_schema.views
{% endcall %}
Expand All @@ -184,41 +195,56 @@
{% endmacro %}


{% macro firebolt__create_table_as(temporary, relation, sql) -%}
{% macro firebolt__create_table_as(temporary,
relation,
select_sql) -%}
{# Create table using CTAS
Args:
temporary (bool): Unused, included so macro signature matches
that of dbt's default macro
relation (dbt relation/dict):
that of dbt's default macro
relation (dbt relation/dict)
select_sql (string): The SQL query that will be used to generate
the internal query of the CTAS
#}
{%- set table_type = config.get('table_type', default='dimension') | upper -%}
{%- set primary_index = config.get('primary_index') %}
{%- set primary_index = config.get('primary_index') -%}
{%- set incremental_strategy = config.get('incremental_strategy') -%}
{%- set partitions = config.get('partition_by') %}
CREATE {{ table_type }} TABLE IF NOT EXISTS {{ relation }}
{%- if primary_index %}
PRIMARY INDEX
{% if primary_index is iterable and primary_index is not string -%}
{{ primary_index | join(', ') }}
{%- else -%}
{{ primary_index }}
{%- endif -%}
{% endif %}
PRIMARY INDEX
{% if primary_index is iterable and primary_index is not string %}
{{ primary_index | join(', ') }}
{%- else -%}
{{ primary_index }}
{%- endif -%}
{%- endif -%}
{% if partitions %}
PARTITION BY
{% if partitions is iterable and partitions is not string %}
{{ partitions | join(', ') }}
{%- else -%}
{{ partitions }}
{%- endif -%}
{%- endif %}
AS (
{{ sql }}
{{ select_sql }}
)
{% endmacro %}
{% macro firebolt__create_view_as(relation, sql) %}
{% macro firebolt__create_view_as(relation, select_sql) %}
{#-
Return SQL string to create view.
Args:
relation (dict): dbt relation
sql (str): pre-generated SQL
Args:
relation (dict): dbt relation
select_sql (string): The SQL query that will be used to generate
the internal query of the CTAS
#}
CREATE OR REPLACE VIEW {{ relation.identifier }} AS (
{{ sql }}
{{ select_sql }}
)
{% endmacro %}
Expand All @@ -227,6 +253,7 @@
{#
Truncate relation. Actual macro is drop_relation in ./adapters/relation.sql.
#}
{# Firebolt doesn't currently support TRUNCATE, so DROP CASCADE.
This should only be called from reset_csv_table, where it's followed by
`create_csv_table`, so not recreating the table here. To retrieve old code,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
{% macro intersect_columns(source_columns, target_columns) %}
{# Return list of columns that appear in both source and target. #}
{# Return a List[FireboltColumn] of columns that appear in both source and target.
Args:
source_columns: List[FireboltColumn]
target_columns: List[FireboltColumn]
#}
{% set result = [] %}
{# Note I'm using `column` and not `name` below, as name is a getter
and column is the actual field. #}
{% set source_names = source_columns | map(attribute = 'column') | list %}
{% set target_names = target_columns | map(attribute = 'column') | list %}
{# Check whether the name attribute exists in the target - this does
not perform a data type check. This is O(m•n), but cardinality
of columns is probably low enough that it doesn't matter. #}
{# Check whether the name attribute exists in the target - this does
not perform a data type check. This is O(m•n), but cardinality
of columns is probably low enough that it doesn't matter. #}
{% for sc in source_columns %}
{% if sc.name in target_names %}
{{ result.append(sc) }}
Expand Down
Loading

0 comments on commit af4be25

Please sign in to comment.