Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support reading delta tables with delta plugin #263

Merged
merged 22 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"NODE_VERSION": "none"
}
},

// Configure tool-specific properties.
"customizations": {
// Configure properties specific to VS Code.
Expand Down Expand Up @@ -43,21 +42,17 @@
}
}
},

// Add the IDs of extensions you want installed when the container is created.
"extensions": [
"ms-python.python",
"ms-python.vscode-pylance"
]
}
},

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "pip3 install --user -r requirements.txt",

// Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "vscode",
"postCreateCommand": "pip install -e . && pip install -r dev-requirements.txt"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ Please remember that using plugins may require you to add additional dependencie
* `gsheet` depends on `gspread` and `pandas`
* `iceberg` depends on `pyiceberg` and Python >= 3.8
* `sqlalchemy` depends on `pandas`, `sqlalchemy`, and the driver(s) you need
* `delta` depends on `deltalake`, [an example project](https://github.com/milicevica23/dbt-duckdb-delta-plugin-demo)

#### Using Local Python Modules

Expand Down
18 changes: 17 additions & 1 deletion dbt/adapters/duckdb/environments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,27 @@ def initialize_db(
return conn

@classmethod
def initialize_cursor(cls, creds: DuckDBCredentials, cursor):
def initialize_cursor(
cls,
creds: DuckDBCredentials,
cursor,
plugins: Optional[Dict[str, BasePlugin]] = None,
registered_df: dict = {},
):
for key, value in creds.load_settings().items():
# Okay to set these as strings because DuckDB will cast them
# to the correct type
cursor.execute(f"SET {key} = '{value}'")

# update cursor if something is lost in the copy
# of the parent connection
if plugins:
for plugin in plugins.values():
plugin.configure_cursor(cursor)

for df_name, df in registered_df.items():
cursor.register(df_name, df)

return cursor

@classmethod
Expand Down
27 changes: 24 additions & 3 deletions dbt/adapters/duckdb/environments/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, credentials: credentials.DuckDBCredentials):
or credentials.path.startswith("md:")
or credentials.path.startswith("motherduck:")
)
self._REGISTERED_DF: dict = {}

def notify_closed(self):
with self.lock:
Expand All @@ -66,7 +67,10 @@ def handle(self):
if self.conn is None:
self.conn = self.initialize_db(self.creds, self._plugins)
self.handle_count += 1
cursor = self.initialize_cursor(self.creds, self.conn.cursor())

cursor = self.initialize_cursor(
self.creds, self.conn.cursor(), self._plugins, self._REGISTERED_DF
)
return DuckDBConnectionWrapper(cursor, self)

def submit_python_job(self, handle, parsed_model: dict, compiled_code: str) -> AdapterResponse:
Expand All @@ -87,6 +91,10 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
plugin = self._plugins[plugin_name]
handle = self.handle()
cursor = handle.cursor()

if source_config.schema:
cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {source_config.schema}")
milicevica23 marked this conversation as resolved.
Show resolved Hide resolved

save_mode = source_config.get("save_mode", "overwrite")
if save_mode in ("ignore", "error_if_exists"):
params = [source_config.schema, source_config.identifier]
Expand All @@ -106,10 +114,23 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
return
df = plugin.load(source_config)
assert df is not None
materialization = source_config.meta.get("materialization", "table")

materialization = source_config.meta.get(
"materialization", plugin.default_materialization()
)
source_table_name = source_config.table_name()
df_name = source_table_name.replace(".", "_") + "_df"

cursor.register(df_name, df)

if materialization == "view":
# save to df instance to register on each cursor creation
self._REGISTERED_DF[df_name] = df

cursor.execute(
f"CREATE OR REPLACE {materialization} {source_config.table_name()} AS SELECT * FROM df"
f"CREATE OR REPLACE {materialization} {source_table_name} AS SELECT * FROM {df_name}"
)

cursor.close()
handle.close()

Expand Down
13 changes: 13 additions & 0 deletions dbt/adapters/duckdb/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,16 @@ def load(self, source_config: SourceConfig):

def store(self, target_config: TargetConfig):
raise NotImplementedError(f"store method not implemented for {self.name}")

def configure_cursor(self, cursor):
"""
Configure each copy of the DuckDB cursor.
This method should be overridden by subclasses to provide additional
attributes to the connection which are lost in the copy of the parent connection.

:param cursor: A DuckDBPyConnection instance to be configured.
"""
pass

def default_materialization(self):
return "table"
48 changes: 48 additions & 0 deletions dbt/adapters/duckdb/plugins/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Any
from typing import Dict

from deltalake import DeltaTable

from . import BasePlugin
from ..utils import SourceConfig


class Plugin(BasePlugin):
def initialize(self, config: Dict[str, Any]):
pass

def configure_cursor(self, cursor):
pass

def load(self, source_config: SourceConfig):
if "delta_table_path" not in source_config:
raise Exception("'delta_table_path' is a required argument for the delta table!")

table_path = source_config["delta_table_path"]
storage_options = source_config.get("storage_options", None)

if storage_options:
dt = DeltaTable(table_path, storage_options=storage_options)
else:
dt = DeltaTable(table_path)

# delta attributes
as_of_version = source_config.get("as_of_version", None)
as_of_datetime = source_config.get("as_of_datetime", None)

if as_of_version:
dt.load_version(as_of_version)

if as_of_datetime:
dt.load_with_datetime(as_of_datetime)

df = dt.to_pyarrow_dataset()

return df

def default_materialization(self):
return "view"


# Future
# TODO add databricks catalog
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ sqlalchemy
tox>=3.13
twine
wheel
deltalake
133 changes: 133 additions & 0 deletions tests/functional/plugins/test_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import pytest
from pathlib import Path
import pandas as pd
import tempfile

from dbt.tests.util import (
check_relations_equal,
run_dbt,
)
from deltalake.writer import write_deltalake

delta_schema_yml = """
version: 2
sources:
- name: delta_source
meta:
plugin: delta
tables:
- name: table_1
description: "An delta table"
meta:
delta_table_path: "{test_delta_path1}"

- name: delta_source_test
schema: test
meta:
plugin: delta
tables:
- name: table_2
description: "An delta table"
meta:
delta_table_path: "{test_delta_path2}"
as_of_version: 0
"""


delta1_sql = """
{{ config(materialized='table') }}
select * from {{ source('delta_source', 'table_1') }}
"""
delta2_sql = """
{{ config(materialized='table') }}
select * from {{ source('delta_source', 'table_1') }} limit 1
"""
delta3_sql = """
{{ config(materialized='table') }}
select * as a from {{ source('delta_source_test', 'table_2') }} WHERE y = 'd'
"""

delta3_sql_expected = """
select 1 as x, 'a' as y
"""


@pytest.mark.skip_profile("buenavista", "md")
class TestPlugins:
@pytest.fixture(scope="class")
def delta_test_table1(self):
td = tempfile.TemporaryDirectory()
path = Path(td.name)
table_path = path / "test_delta_table1"

df = pd.DataFrame({"x": [1, 2, 3]})
write_deltalake(table_path, df, mode="overwrite")

yield table_path

td.cleanup()

@pytest.fixture(scope="class")
def delta_test_table2(self):
td = tempfile.TemporaryDirectory()
path = Path(td.name)
table_path = path / "test_delta_table2"

df = pd.DataFrame({
"x": [1],
"y": ["a"]
})
write_deltalake(table_path, df, mode="overwrite")

df = pd.DataFrame({
"x": [1, 2],
"y": ["a","b"]
})
write_deltalake(table_path, df, mode="overwrite")

yield table_path

td.cleanup()

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
plugins = [{"module": "delta"}]
return {
"test": {
"outputs": {
"dev": {
"type": "duckdb",
"path": dbt_profile_target.get("path", ":memory:"),
"plugins": plugins,
}
},
"target": "dev",
}
}

@pytest.fixture(scope="class")
def models(self, delta_test_table1,delta_test_table2):
return {
"source_schema.yml": delta_schema_yml.format(
test_delta_path1=delta_test_table1,
test_delta_path2=delta_test_table2
),
"delta_table1.sql": delta1_sql,
"delta_table2.sql": delta2_sql,
"delta_table3.sql": delta3_sql,
"delta_table3_expected.sql": delta3_sql_expected,
}

def test_plugins(self, project):
results = run_dbt()
assert len(results) == 4

# check_relations_equal(
# project.adapter,
# [
# "delta_table3",
# "delta_table3_expected",
# ],
# )
# res = project.run_sql("SELECT count(1) FROM 'delta_table3'", fetch="one")
# assert res[0] == 2
Loading