Skip to content

Commit

Permalink
Add support for loading manifest from cloud store using Airflow Objec…
Browse files Browse the repository at this point in the history
…t Store
  • Loading branch information
pankajkoti committed Jul 21, 2024
1 parent e61f3a3 commit c2cb5e3
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 10 deletions.
36 changes: 28 additions & 8 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
from pathlib import Path
from typing import Any, Callable, Iterator

from airflow.version import version as airflow_version

from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled
from cosmos.constants import (
DEFAULT_PROFILES_FILE_NAME,
FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP,
DbtResourceType,
ExecutionMode,
InvocationMode,
Expand All @@ -24,6 +27,7 @@
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.profiles import BaseProfileMapping
from cosmos.settings import AIRFLOW_IO_AVAILABLE

logger = get_logger(__name__)

Expand Down Expand Up @@ -150,6 +154,7 @@ def __init__(
seeds_relative_path: str | Path = "seeds",
snapshots_relative_path: str | Path = "snapshots",
manifest_path: str | Path | None = None,
manifest_conn_id: str | None = None,
project_name: str | None = None,
env_vars: dict[str, str] | None = None,
dbt_vars: dict[str, str] | None = None,
Expand All @@ -175,7 +180,25 @@ def __init__(
self.project_name = self.dbt_project_path.stem

if manifest_path:
self.manifest_path = Path(manifest_path)
manifest_path_str = str(manifest_path)
if not manifest_conn_id:
manifest_scheme = manifest_path_str.split("://")[0]
# Use the default Airflow connection ID for the scheme if it is not provided.
manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, None)

if manifest_conn_id is not None and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)

if AIRFLOW_IO_AVAILABLE:
from airflow.io.path import ObjectStoragePath

self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id)
else:
self.manifest_path = Path(manifest_path_str)

self.env_vars = env_vars
self.dbt_vars = dbt_vars
Expand All @@ -196,24 +219,21 @@ def validate_project(self) -> None:
if self.dbt_project_path:
project_yml_path = self.dbt_project_path / "dbt_project.yml"
mandatory_paths = {
"dbt_project.yml": project_yml_path,
"models directory ": self.models_path,
"dbt_project.yml": Path(project_yml_path) if project_yml_path else None,
"models directory ": Path(self.models_path) if self.models_path else None,
}
if self.manifest_path:
mandatory_paths["manifest"] = self.manifest_path

for name, path in mandatory_paths.items():
if path is None or not Path(path).exists():
if path is None or not path.exists():
raise CosmosValueError(f"Could not find {name} at {path}")

def is_manifest_available(self) -> bool:
"""
Check if the `dbt` project manifest is set and if the file exists.
"""
if not self.manifest_path:
return False

return self.manifest_path.exists()
return self.manifest_path.exists() if self.manifest_path else False


@dataclass
Expand Down
14 changes: 14 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from pathlib import Path

import aenum
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from packaging.version import Version

DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
Expand All @@ -28,6 +31,17 @@
PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")]


S3_FILE_SCHEME = "s3"
GS_FILE_SCHEME = "gs"
ABFS_FILE_SCHEME = "abfs"

FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP = {
S3_FILE_SCHEME: S3Hook.default_conn_name,
GS_FILE_SCHEME: GCSHook.default_conn_name,
ABFS_FILE_SCHEME: AzureDataLakeHook.default_conn_name,
}


class LoadMode(Enum):
"""
Supported ways to load a `dbt` project into a `DbtGraph` instance.
Expand Down
8 changes: 6 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from functools import cached_property
from pathlib import Path
from subprocess import PIPE, Popen
from typing import Any
from typing import TYPE_CHECKING, Any

from airflow.models import Variable

Expand Down Expand Up @@ -594,7 +594,11 @@ def load_from_dbt_manifest(self) -> None:
raise CosmosLoadDbtException("Unable to load manifest without ExecutionConfig.dbt_project_path")

nodes = {}
with open(self.project.manifest_path) as fp: # type: ignore[arg-type]

if TYPE_CHECKING:
assert self.project.manifest_path is not None

with self.project.manifest_path.open() as fp:
manifest = json.load(fp)

resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
Expand Down
4 changes: 4 additions & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import airflow
from airflow.configuration import conf
from airflow.version import version as airflow_version
from packaging.version import Version

from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE

Expand All @@ -24,3 +26,5 @@
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)

AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")

0 comments on commit c2cb5e3

Please sign in to comment.