-
-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support DBT Manifests from Snowflake Stage #116
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import gzip | ||
import json | ||
import tempfile | ||
from pathlib import PurePath, Path, PosixPath, PurePosixPath | ||
from typing import Dict | ||
|
||
from dbt.config.runtime import load_profile | ||
from dbt.flags import get_flags | ||
from dbt.mp_context import get_mp_context | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: I suspect that dbt-core API instability over time might require a different import for older versions of dbt-core. I'd recommend digging into dbt-core 1.6 and 1.7 to confirm. |
||
from pydantic import BaseModel | ||
|
||
|
||
class SnowflakeReferenceConfig(BaseModel): | ||
"""Configuration for an reference stored in Snowflake Stage""" | ||
|
||
stage: str | ||
stage_path: str | ||
|
||
|
||
class SnowflakeClient: | ||
"""A client for loading manifest files from Snowflake Stage.""" | ||
|
||
def __init__(self, stage: str, stage_path: str) -> None: | ||
self.stage = stage | ||
self.stage_path = stage_path | ||
|
||
def load_manifest(self) -> Dict: | ||
"""Load the manifest.json file from Snowflake stage.""" | ||
|
||
# Import locally to not require dbt-snowflake to be installed | ||
from dbt.adapters.snowflake import SnowflakeAdapter | ||
Comment on lines
+30
to
+31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thought: What happens here if adapters isn't installed? Should we throw an exception and a nice log line? |
||
|
||
flags = get_flags() | ||
profile = load_profile( | ||
project_root=flags.PROJECT_DIR, | ||
cli_vars=flags.VARS, | ||
profile_name_override=flags.PROFILE, | ||
target_override=flags.TARGET, | ||
) | ||
adapter = SnowflakeAdapter(profile, get_mp_context()) | ||
Comment on lines
+33
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Praise: This is so dang clever. Fantastic work. |
||
file_name = str(PurePosixPath(self.stage_path).name) | ||
tmp_dir = tempfile.mkdtemp(prefix="dbt_loom_") | ||
# Snowflake needs '/' path separators | ||
tmp_dir_sf = tmp_dir.replace("\\", "/") | ||
|
||
with adapter.connection_named("dbt-loom"): | ||
get_query = f"get @{self.stage}/{self.stage_path} file://{tmp_dir_sf}/" | ||
response, table = adapter.connections.execute(get_query) | ||
if response.rows_affected == 0: | ||
raise Exception( | ||
f"Failed to get file {self.stage}/{self.stage_path}: {response}" | ||
) | ||
Comment on lines
+42
to
+52
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: Am I correct in believing that this will download the file from the Snowflake stage into a temporary file? |
||
|
||
download_path = Path(tmp_dir) / file_name | ||
|
||
if download_path.name.endswith(".gz"): | ||
with gzip.GzipFile(download_path) as gzip_file: | ||
content = gzip_file.read().decode("utf-8") | ||
else: | ||
with download_path.open("r") as f: | ||
content = f.read() | ||
|
||
return json.loads(content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Praise: Thanks for adding an example for how to configure this manifest source.