From e99d3fd7b210df785627c649868b13c6d0c3cc5d Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Fri, 4 Oct 2024 14:18:59 -0500 Subject: [PATCH] [fivetran] add set_kind_tag_for_service toggle --- .../dagster-fivetran/dagster_fivetran/asset_defs.py | 13 ++++++++++--- .../test_load_from_instance.py | 9 ++++++++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py index 797c936845618..f7e0d449e5e44 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py @@ -362,7 +362,7 @@ def build_asset_defn_metadata( extra_metadata={ "connector_id": self.connector_id, "io_manager_key": io_manager_key, - "storage_kind": self.service, + "service": self.service, }, ) @@ -373,11 +373,12 @@ def _build_fivetran_assets_from_metadata( poll_interval: float, poll_timeout: Optional[float], fetch_column_metadata: bool, + set_kind_tag_for_service: bool, ) -> AssetsDefinition: metadata = cast(Mapping[str, Any], assets_defn_meta.extra_metadata) connector_id = cast(str, metadata["connector_id"]) io_manager_key = cast(Optional[str], metadata["io_manager_key"]) - storage_kind = cast(Optional[str], metadata.get("storage_kind")) + service = cast(Optional[str], metadata.get("service")) return _build_fivetran_assets( connector_id=connector_id, @@ -396,7 +397,7 @@ def _build_fivetran_assets_from_metadata( group_name=assets_defn_meta.group_name, poll_interval=poll_interval, poll_timeout=poll_timeout, - asset_tags=build_kind_tag(storage_kind) if storage_kind else None, + asset_tags=build_kind_tag(service) if set_kind_tag_for_service and service else None, fetch_column_metadata=fetch_column_metadata, infer_missing_tables=False, op_tags=None, @@ -416,6 +417,7 @@ def __init__( poll_interval: float, poll_timeout: Optional[float], fetch_column_metadata: bool, + set_kind_tag_for_service: bool, ): self._fivetran_resource_def = fivetran_resource_def if isinstance(fivetran_resource_def, FivetranResource): @@ -445,6 +447,7 @@ def __init__( self._poll_interval = poll_interval self._poll_timeout = poll_timeout self._fetch_column_metadata = fetch_column_metadata + self._set_kind_tag_for_service = set_kind_tag_for_service contents = hashlib.sha1() contents.update(",".join(key_prefix).encode("utf-8")) @@ -535,6 +538,7 @@ def build_definitions( poll_interval=self._poll_interval, poll_timeout=self._poll_timeout, fetch_column_metadata=self._fetch_column_metadata, + set_kind_tag_for_service=self._set_kind_tag_for_service, ) for meta in data ] @@ -559,6 +563,7 @@ def load_assets_from_fivetran_instance( poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, fetch_column_metadata: bool = True, + set_kind_tag_for_service: bool = True, ) -> CacheableAssetsDefinition: """Loads Fivetran connector assets from a configured FivetranResource instance. This fetches information about defined connectors at initialization time, and will error on workspace load if the Fivetran @@ -588,6 +593,7 @@ def load_assets_from_fivetran_instance( timed out. By default, this will never time out. fetch_column_metadata (bool): If True, will fetch column schema information for each table in the connector. This will induce additional API calls. + set_kind_tag_for_service (bool): If True, will set a kind tag on the asset for the destination service. **Examples:** @@ -644,4 +650,5 @@ def load_assets_from_fivetran_instance( poll_interval=poll_interval, poll_timeout=poll_timeout, fetch_column_metadata=fetch_column_metadata, + set_kind_tag_for_service=set_kind_tag_for_service, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py index 872e38412337d..f7fc1d320c6f6 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py @@ -56,12 +56,14 @@ ) @pytest.mark.parametrize("multiple_connectors", [True, False]) @pytest.mark.parametrize("destination_ids", [None, [], ["some_group"]]) +@pytest.mark.parametrize("set_kind_tag_for_service", [True, False]) def test_load_from_instance( connector_to_group_fn, filter_connector, connector_to_asset_key_fn, multiple_connectors, destination_ids, + set_kind_tag_for_service, ) -> None: with environ({"FIVETRAN_API_KEY": "some_key", "FIVETRAN_API_SECRET": "some_secret"}): load_calls = [] @@ -132,6 +134,7 @@ def load_input(self, context: InputContext) -> Any: connector_to_io_manager_key_fn=(lambda _: "test_io_manager"), poll_interval=10, poll_timeout=600, + set_kind_tag_for_service=set_kind_tag_for_service, ) else: ft_cacheable_assets = load_assets_from_fivetran_instance( @@ -141,6 +144,7 @@ def load_input(self, context: InputContext) -> Any: io_manager_key="test_io_manager", poll_interval=10, poll_timeout=600, + set_kind_tag_for_service=set_kind_tag_for_service, ) ft_assets = ft_cacheable_assets.build_definitions( ft_cacheable_assets.compute_cacheable_data() @@ -216,7 +220,10 @@ def downstream_asset(xyz): assert metadata.get("dagster/relation_identifier") == ( "example_database." + ".".join(key.path[-2:]) ) - assert has_kind(assets_def.tags_by_key[key], "snowflake") + if set_kind_tag_for_service: + assert has_kind(assets_def.tags_by_key[key], "snowflake") + else: + assert not has_kind(assets_def.tags_by_key[key], "snowflake") assert ft_assets[0].keys == tables assert all(