Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for loading dbt project from cloud store using Airflow Object Store #1148

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class ProjectConfig:
def __init__(
self,
dbt_project_path: str | Path | None = None,
dbt_project_conn_id: str | None = None,
models_relative_path: str | Path = "models",
seeds_relative_path: str | Path = "seeds",
snapshots_relative_path: str | Path = "snapshots",
Expand All @@ -170,39 +171,40 @@ def __init__(
)
if project_name:
self.project_name = project_name

if manifest_path:
self.manifest_path = self.get_property_from_cloud_or_local(manifest_path, manifest_conn_id)
if dbt_project_path:
self.dbt_project_path = Path(dbt_project_path)
self.dbt_project_path = self.get_property_from_cloud_or_local(dbt_project_path, dbt_project_conn_id)
self.models_path = self.dbt_project_path / Path(models_relative_path)
self.seeds_path = self.dbt_project_path / Path(seeds_relative_path)
self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path)
if not project_name:
self.project_name = self.dbt_project_path.stem

if manifest_path:
manifest_path_str = str(manifest_path)
if not manifest_conn_id:
manifest_scheme = manifest_path_str.split("://")[0]
# Use the default Airflow connection ID for the scheme if it is not provided.
manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, lambda: None)()
self.env_vars = env_vars
self.dbt_vars = dbt_vars
self.partial_parse = partial_parse

if manifest_conn_id is not None and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)
def get_property_from_cloud_or_local(self, property: Path | str, property_conn_id: str | None = None) -> Path:
property_str = str(property)
if not property_conn_id:
scheme = property_str.split("://")[0]
# Use the default Airflow connection ID for the scheme if it is not provided.
property_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(scheme, lambda: None)()

if AIRFLOW_IO_AVAILABLE:
from airflow.io.path import ObjectStoragePath
if property_conn_id is not None and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"The path {property_str} uses a remote file scheme, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)

self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id)
else:
self.manifest_path = Path(manifest_path_str)
if AIRFLOW_IO_AVAILABLE and property_conn_id:
from airflow.io.path import ObjectStoragePath

self.env_vars = env_vars
self.dbt_vars = dbt_vars
self.partial_parse = partial_parse
return ObjectStoragePath(property_str, conn_id=property_conn_id)
else:
return Path(property_str)

def validate_project(self) -> None:
"""
Expand All @@ -224,8 +226,8 @@ def validate_project(self) -> None:
if self.dbt_project_path:
project_yml_path = self.dbt_project_path / "dbt_project.yml"
mandatory_paths = {
"dbt_project.yml": Path(project_yml_path) if project_yml_path else None,
"models directory ": Path(self.models_path) if self.models_path else None,
Comment on lines -230 to -231
Copy link
Contributor Author

@CorsettiS CorsettiS Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly speaking I could not understand this even after reading the comment. both the self.dbt_project_path and self.models_path are already Path as specified in the init. I just reverted to its previous state

"dbt_project.yml": project_yml_path,
"models directory": self.models_path,
}
if self.manifest_path:
mandatory_paths["manifest"] = self.manifest_path
Expand Down
8 changes: 7 additions & 1 deletion docs/configuration/project-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ Project Config
The ``cosmos.config.ProjectConfig`` allows you to specify information about where your dbt project is located and project
variables that should be used for rendering and execution. It takes the following arguments:

- ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file
- ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file.
Along with supporting local paths, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, Cosmos also supports
remote paths for your dbt project (e.g. S3 URL).
- ``dbt_project_conn_id``: The connection id for the Airflow connection that contains the credentials for the remote
dbt project. This is only required if you're using a remote dbt project path.
- ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to
``models/``
- ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to
Expand All @@ -14,6 +18,8 @@ variables that should be used for rendering and execution. It takes the followin
- ``manifest_path``: The absolute path to your manifests directory. This is only required if you're using Cosmos' manifest
parsing mode. Along with supporting local paths for manifest parsing, starting with Cosmos 1.6.0, if you've
Airflow >= 2.8.0, Cosmos also supports remote paths for manifest parsing(e.g. S3 URL). See :ref:`parsing-methods` for more details.
- ``manifest_conn_id``: The connection id for the Airflow connection that contains the credentials for the remote
manifest path. This is only required if you're using a remote manifest path.
- ``project_name`` : The name of the project. If ``dbt_project_path`` is provided, the ``project_name`` defaults to the
folder name containing ``dbt_project.yml``. If ``dbt_project_path`` is not provided, and ``manifest_path`` is provided,
``project_name`` is required as the name can not be inferred from ``dbt_project_path``
Expand Down
49 changes: 42 additions & 7 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,8 @@ def test_init_with_manifest_path_and_project_path_succeeds():
project_name in this case should be based on dbt_project_path
"""
project_config = ProjectConfig(dbt_project_path="/tmp/some-path", manifest_path="target/manifest.json")
if AIRFLOW_IO_AVAILABLE:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is actually wrong since AIRFLOW_IO_AVAILABLE just means we are using airflow >=2.8.0, and not that we are using object storage.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tatiana indeed, although using airflow 2.8 does not exactly mean we are using object storage, which was the assumption of this test, so I just fixed it.

from airflow.io.path import ObjectStoragePath

assert project_config.manifest_path == ObjectStoragePath("target/manifest.json")
else:
assert project_config.manifest_path == Path("target/manifest.json")
assert project_config.manifest_path == Path("target/manifest.json")
assert project_config.dbt_project_path == Path("/tmp/some-path")
assert project_config.project_name == "some-path"


Expand Down Expand Up @@ -324,10 +320,49 @@ def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manife
from airflow.version import version as airflow_version

error_msg = (
f"The manifest path {manifest_path} uses a remote file scheme, but the required Object Storage feature is "
f"The path {manifest_path} uses a remote file scheme, but the required Object Storage feature is "
f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later."
)
with pytest.raises(CosmosValueError, match=error_msg):
_ = ProjectConfig(
dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id
)


@pytest.mark.parametrize(
"dbt_project_path, given_dbt_project_conn_id, used_dbt_project_conn_id, project_name",
[
("s3://cosmos-dbt-project-test/test-project", None, "aws_default", "custom-project-name"),
("s3://cosmos-dbt-project-test/test-project", "aws_s3_conn", "aws_s3_conn", None),
("gs://cosmos-dbt-project-test/test-project", None, "google_cloud_default", "custom-project-name"),
("gs://cosmos-dbt-project-test/test-project", "gcp_gs_conn", "gcp_gs_conn", None),
("abfs://cosmos-dbt-project-test/test-project", None, "wasb_default", "custom-project-name"),
("abfs://cosmos-dbt-project-test/test-project", "azure_abfs_conn", "azure_abfs_conn", None),
],
)
def test_remote_dbt_project_path(dbt_project_path, given_dbt_project_conn_id, used_dbt_project_conn_id, project_name):
if AIRFLOW_IO_AVAILABLE:
project_config = ProjectConfig(
dbt_project_path=dbt_project_path,
dbt_project_conn_id=given_dbt_project_conn_id,
manifest_path="/some/manifest.json",
project_name=project_name,
)

from airflow.io.path import ObjectStoragePath

assert project_config.dbt_project_path == ObjectStoragePath(dbt_project_path, conn_id=used_dbt_project_conn_id)
assert project_config.project_name == project_name if project_name else "test-project"
else:
from airflow.version import version as airflow_version

error_msg = (
f"The path {dbt_project_path} uses a remote file scheme, but the required Object Storage feature is "
f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later."
)
with pytest.raises(CosmosValueError, match=error_msg):
_ = ProjectConfig(
dbt_project_path=dbt_project_path,
dbt_project_conn_id=given_dbt_project_conn_id,
manifest_path="/some/manifest.json",
)
Loading