Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for loading dbt project from cloud store using Airflow Object Store #1148

Closed
wants to merge 21 commits into from
Closed
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class ProjectConfig:
def __init__(
self,
dbt_project_path: str | Path | None = None,
dbt_project_conn_id: str | None = None,
models_relative_path: str | Path = "models",
seeds_relative_path: str | Path = "seeds",
snapshots_relative_path: str | Path = "snapshots",
Expand All @@ -172,10 +173,30 @@ def __init__(
self.project_name = project_name

if dbt_project_path:
self.dbt_project_path = Path(dbt_project_path)
self.models_path = self.dbt_project_path / Path(models_relative_path)
self.seeds_path = self.dbt_project_path / Path(seeds_relative_path)
self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path)
dbt_project_path_str = str(dbt_project_path)
if not dbt_project_conn_id:
self.dbt_project_path = Path(dbt_project_path)
self.models_path = self.dbt_project_path / Path(models_relative_path)
self.seeds_path = self.dbt_project_path / Path(seeds_relative_path)
self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path)
elif dbt_project_conn_id is not None and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"The dbt project path {dbt_project_path_str} uses a remote file scheme, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)
elif AIRFLOW_IO_AVAILABLE:
from airflow.io.path import ObjectStoragePath

self.dbt_project_path = ObjectStoragePath(dbt_project_path_str, conn_id=manifest_conn_id)
self.models_path = self.dbt_project_path / Path(models_relative_path)
self.seeds_path = self.dbt_project_path / Path(seeds_relative_path)
self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path)
else:
self.dbt_project_path = Path(dbt_project_path_str)
self.models_path = self.dbt_project_path / Path(models_relative_path)
self.seeds_path = self.dbt_project_path / Path(seeds_relative_path)
self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path)
if not project_name:
self.project_name = self.dbt_project_path.stem

Expand Down Expand Up @@ -224,8 +245,8 @@ def validate_project(self) -> None:
if self.dbt_project_path:
project_yml_path = self.dbt_project_path / "dbt_project.yml"
mandatory_paths = {
"dbt_project.yml": Path(project_yml_path) if project_yml_path else None,
"models directory ": Path(self.models_path) if self.models_path else None,
Comment on lines -230 to -231
Copy link
Contributor Author

@CorsettiS CorsettiS Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly speaking I could not understand this even after reading the comment. both the self.dbt_project_path and self.models_path are already Path as specified in the init. I just reverted to its previous state

"dbt_project.yml": project_yml_path,
"models directory": self.models_path,
}
if self.manifest_path:
mandatory_paths["manifest"] = self.manifest_path
Expand Down
Loading